Gérer rapidement un pool de threads avec notification

Aujourd'hui un rapide bout de code pour montrer quels objets Java utiliser dans le cas où vous avez besoin de mettre en place rapidement un pool de threads.

Le but est d'utiliser un ExecutorService fournit par la classe Executors. Voici comment :

import java.util.ArrayList;
import java.util.List;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExportEnginePoolManager implements Observer {

 private static final int poolSize = 10;

 private static int managedThreads = 0;

 private static ExecutorService execSvc;

 private static ExportEnginePoolManager instance;

 private static List managedEngines;

 public static List getManagedEngines() {
  return managedEngines;
 }

 private ExportEnginePoolManager() {
  execSvc = Executors.newFixedThreadPool(poolSize);
  managedEngines = new ArrayList();
 }

 public static ExportEnginePoolManager getInstance() {
  if (instance == null) {
   instance = new ExportEnginePoolManager();
  }
  return instance;
 }

 public void addAndStart(AbstractExportEngine engine) throws Exception {
  if (!execSvc.isShutdown()) {
   engine.addObserver(this);
   execSvc.submit(engine);
   managedEngines.add(engine);
   managedThreads++;
   if (managedThreads == poolSize) {
    execSvc.shutdown();
   }
  } else {
   throw new UnsupportedOperationException(
     "Le nombre maximum de threads simultanés a été atteint!");
  }
 }

 @Override
 public void update(Observable o, Object arg) {
  try {
   AbstractExportEngine aee = (AbstractExportEngine) o;
   if (AbstractExportEngine.TERMINATED_STATUS.equals(aee
     .getStatus())) {
    System.out.println("### " + aee.getName()
      + " state is " + aee.getStatus());
   }
   managedEngines.remove(aee);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

La méthode qui nous intéresse ici est addAndStart. Elle permet d'ajouter à la file d'exécution le thread en question, qui sera automatiquement lancé par l'ExecutorService. Le constructeur privé a ici pour seul but de créer un singleton dans l'application gérant au maximum 10 threads.

Les threads typiques à utiliser ressembleront à :

import java.util.Observable;

public abstract class AbstractExportEngine extends Observable implements
  Runnable {

 private String status;

 private String name;

 public static final String TERMINATED_STATUS = "Terminated";

 public static final String ERROR_STATUS = "Error";

 public AbstractExportEngine(String pName) {
  this.name = pName;
 }

 public void terminate() {
  setChanged();
  notifyObservers();
 }

 public void setStatus(String status) {
  this.status = status;
 }

 public String getStatus() {
  return status;
 }

 public void setName(String name) {
  this.name = name;
 }

 public String getName() {
  return name;
 }

 @Override
 public void run() {
  try {
   // actions...
   terminate();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

Cette architecture implémente donc le design pattern Observer. Les threads passés au service sont capables de notifier un évènement (en l'occurrence leur fin) à notre manager, qui sera ainsi en mesure d'exécuter une quelconque action à chaque fois qu'un de ses threads est terminé.

Cet exemple aussi simple qu'il y parait a surtout pour but de mettre en lumière le système d'Executors proposé par Java et qui permet de gérer de multiples façons et très simplement plusieurs threads : par paquets, en file... Associé à ce principe, l'utilisation des objets Callable et Future permet d'encore mieux gérer la récupération du résultat des threads, même s'ils ne sont pas applicables dans tous les contextes.

Sources


Fichier(s) joint(s) :

0 commentaires: