Apache Camel par l'exemple

Avec cet article j'ai décidé d'entrer directement dans le vif du sujet...

S'il fallait présenter rapidement Camel, on pourrait dire qu'il s'agit d'une plateforme d'intégration d'application, basée sur un système d'échange de messages et dont le but est de fournir une implémentation des grands patrons d'intégrations en entreprise (facilitant la communication inter-applications). Pour ne pas plagier ou paraphraser, voici deux articles intéressants présentant ces patrons : le premier sur le site de Novedia et le second, plus détaillé chez Soat. Pour continuer sur une présentation plus spécifique de Camel, voici un premier billet écrit sur le blog d'Octo en enfin une présentation complète par un des co-auteurs du livre "Camel In Action", Jonathan Anstey.

Mon but ici est donc de fournir un exemple de mise en place d'un "bus" Camel pour créer un flux applicatif.

Le scénario est le suivant : on doit récupérer une archive zippée sur un serveur FTP distant, la décompresser et traiter son contenu en fonction de son type : les fichiers CSV doivent être segmentés selon une règle métier puis re-zippés unitairement et les autres types de fichiers sont envoyés à un script shell. Entre-temps, les données sont triées et validées. Celles qui sont invalides sont déposées séparément dans un répertoire spécifique.

De manière plus illustrée :

En jaune sont représentés les composants intégrés à Camel : FTP, ZIP, EXEC et CSV
En rouge sont illustrés les patrons d'intégration implémentés : Split, Enricher, Router, Filter, Sort, Recipient list, Validate.
En blanc sont indiqués les beans/services personnalisés ajoutés.

Et maintenant le plus intéressant, le code pour la mise en place des routes (les commentaires décrivent tout son fonctionnement) :

import java.util.Comparator;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.csv.CsvDataFormat;
import org.apache.camel.language.bean.BeanLanguage;
import org.apache.camel.processor.validation.PredicateValidationException;

import CamelProperties;
import ReferentielDepartement;

/**
 * Main class to creates Camel routes
 * 
 * @author pe.faidherbe
 *
 */
public class MyRoutesBuilder extends RouteBuilder {
 
 // Stores archive's file name retrieved from remote server
 private static String downloadedFileName;
 
 // Indicates Csv column's name used to sort datas
 private static String csvIdentityColumn;
 
 // Indicates Csv column's number used to sort datas
 private static Integer csvIdentityColumnNumber;
 
 // Indicates length of sorting data used as data identifier
 private static int csvIdentityLength;
 
 // General properties
 private static final CamelProperties camelProps = CamelProperties.getInstance();
 
 // Csv data delimiter
 private static final String csvDelimiter = camelProps.getProperty(CamelProperties.CAMEL_DATA_SEP);
 
 // Prefix of data file (used for routing)
 private static final String NAT_FILE_PREFIX = camelProps.getProperty(CamelProperties.CAMEL_DATA_FILE_NAT_PREFIX);
 
 // Prefix of data file (used for routing)
 private static final String S2_FILE_PREFIX = camelProps.getProperty(CamelProperties.CAMEL_DATA_FILE_S2_PREFIX);
 
 // Prefix of data file (used for routing)
 private static final String ILOT_FILE_PREFIX = camelProps.getProperty(CamelProperties.CAMEL_DATA_FILE_ILOT_PREFIX);
 
 /**
  * Getter used by Camel
  * @return remote File Name
  */
 public String getDownloadedFileName() {
  return downloadedFileName;
 }
 
 /**
  * Getter used by Camel
  * @return csv identity column
  */
 public String getCsvIdentColumn() {
  return csvIdentityColumn;
 }
 
 /**
  * Getter used by Camel
  * @return csv identity column num
  */
 public Integer getCsvIdentityColumnNumber() {
  return csvIdentityColumnNumber;
 }
 
 /**
  * Getter used by Camel
  * @return csv identity data length
  */
 public int getCsvIdentLength() {
  return csvIdentityLength;
 }
 
 /**
  * Getter used by Camel
  * @return csv delimiter to use
  */
 public String getCsvDelimiter() {
  return csvDelimiter;
 }
 
 /**
  * Used by first Camel route to "persist" informations about remote file
  * later passed as parameters for second route
  */
 private void setMyContext(String downloadedArchive) {
  downloadedFileName = downloadedArchive.substring(0, downloadedArchive.indexOf("."));
  if(downloadedFileName.startsWith(S2_FILE_PREFIX)) {
   csvIdentityColumn = camelProps.getProperty(CamelProperties.CAMEL_DATA_IDENT_COL_S2);
   csvIdentityLength = Integer.parseInt(camelProps.getProperty(CamelProperties.CAMEL_DATA_IDENT_S2_LENGTH));
   csvIdentityColumnNumber = Integer.parseInt(camelProps.getProperty(CamelProperties.CAMEL_DATA_IDENT_COL_S2_NUM));
  } else if(downloadedFileName.startsWith(NAT_FILE_PREFIX)) {
   csvIdentityColumn = camelProps.getProperty(CamelProperties.CAMEL_DATA_IDENT_COL_NAT);
   csvIdentityLength = Integer.parseInt(camelProps.getProperty(CamelProperties.CAMEL_DATA_IDENT_NAT_LENGTH));
   csvIdentityColumnNumber = Integer.parseInt(camelProps.getProperty(CamelProperties.CAMEL_DATA_IDENT_COL_NAT_NUM));
  }
 }

 /**
  * @see org.apache.camel.builder.RouteBuilder#configure()
  */
 @Override
 public void configure() throws Exception {
  // Props
  String camelWorkDir = camelProps.getProperty(CamelProperties.CAMEL_WORK_DIR);
  
  // Csv comparator
  CsvSorter sorter = new CsvSorter();
  
  // dead Letter Channel
  errorHandler(deadLetterChannel("log:camel"));
  
  // not validated messages go to particular error folder
  onException(PredicateValidationException.class).handled(true)
   .to("file://C:/test2/csverror?fileName=${header:downloadedFileName}_${header:territoire}_${date:now:yyyyMMddHHmmss}.csv")
   .log("Validation Error : ${exception.message}").end(); 
  
  /*
   * Download
   */
  from("ftp://"+camelProps.getProperty(CamelProperties.FTP_USER_PROP)
    + "@"
    + camelProps.getProperty(CamelProperties.FTP_HOST)
    + "?password="
    + camelProps.getProperty(CamelProperties.FTP_USER_PWD)
    + "&binary=true&noop=true&disconnect=true"
    // Poll every X sec
    + "&consumer.delay=" + camelProps.getProperty(CamelProperties.FTP_POLL_TIME_MS)
    // Specify temp destination for performance issue (not loaded in memory)
    + "&localWorkDirectory="+camelWorkDir)
   .log("Unzipping : ${file:name}")
   // Read as zip file
   .marshal().zip()
   // Unzip in memory 
   .unmarshal().zip()
   // Send to bean to extract entries
   .split().method("ZipService","unzipFile")
    .log("Extracted : ${header:entryName}")
   // Write each file
   .to("file://"+camelWorkDir+"?fileName=${header:entryName}")
   .process(new Processor() {
    @Override
    public void process(Exchange e) throws Exception {
     // Set informations on how to treat latter data
     setMyContext((String) e.getIn().getHeader("CamelFileName"));
    }
   })
  .end();
  
  /*
   *  ROUTER
   */
  from("file://"+camelWorkDir).id("routerRoute").log("Routing start")
   // No autostart to avoid polling "undesired" file (not previously retrieved from ftp)
   //.noAutoStartup()
   // Enrich file polling with context informations
   .enrich("direct:contextEnricher")
   .choice()
    .when(header("downloadedFileName").startsWith(S2_FILE_PREFIX))
     // CSV data, going to split
     .to("direct:surfaces")
    .when(header("downloadedFileName").startsWith(ILOT_FILE_PREFIX))
     // Geo data, go to DB
     .to("direct:ilots")
    .when(header("downloadedFileName").startsWith(NAT_FILE_PREFIX))
     // CSV data, going to split
     .to("direct:national")
    .otherwise()
     .log("Fichier ${file:name} non pris en charge!")
    .end();
  
  /*
   * Content enricher
   */
  from("direct:contextEnricher")
   .setHeader("downloadedFileName", BeanLanguage.bean(getClass(), "getDownloadedFileName"))
   .setHeader("csvIdentityColumn", BeanLanguage.bean(getClass(), "getCsvIdentColumn"))
   .setHeader("csvIdentityLength", BeanLanguage.bean(getClass(), "getCsvIdentLength"))
   .setHeader("csvDelimiter", BeanLanguage.bean(getClass(), "getCsvDelimiter"));
  
  /*
   * Manage geo data
   */
  from("direct:ilots")
   // Manage only SHP files
   .filter(header("entryName").endsWith("shp"))
   .to("file://C:/test2?fileName=${header:entryName}")
   // RecipientList is needed because route is computed at runtime (because of dynamic parameters)
   .recipientList(simple("exec:C:/test2/cmd/ogr2ogr.bat?args=${header:entryName}&workingDir=C:/test2/cmd/&useStderrOnEmptyStdout=true"))
    // Convert to String because cmd return is InputStream
    .convertBodyTo(String.class)
    .log("Command return : ${body} , error : ${header:exec_stderr}")
   .end();
  
  /*
   * Split CSV
   */

  // Used to customized CSV separator
  CsvDataFormat csvFormat = new CsvDataFormat();
  csvFormat.setDelimiter(csvDelimiter);
  
  from("direct:surfaces").convertBodyTo(String.class).unmarshal(csvFormat)
   .sort(body(), sorter)
   // Split CSV content
   // Bean uses StringBuilder and writes CSV content in order to get better performance than
   // creating a lot of List<Map<String, Object>> handled by camel's csv marshaler
   .split().method("CsvService","splitDatas")
   // Business check : is "territoire" a valid data?
   .validate(header("territoire").in(ReferentielDepartement.getDepartements()))
   // Write each data to a proper file
   .to("file://C:/test2/splitted?fileName=Territorial_${header:territoire}_${date:now:yyyyMMdd}.csv")
   .log("Written CSV file for : ${header:territoire}");
  
  from("direct:national").convertBodyTo(String.class).unmarshal(csvFormat)
   .sort(body(), sorter)
   // Split CSV content
   .split().method("CsvService","splitDatas")
   .validate(header("territoire").in(ReferentielDepartement.getDepartements()))
   // Write each data to a proper file
   .to("file://C:/test2/splitted?fileName=${file:onlyname.noext}_${header:territoire}_${date:now:yyyyMMddHHmmss}.csv")
   .log("Written CSV file for : ${header:territoire}");
  
  
  /*
   * Zip final files
   */
  // To handle transformation from camel's DeflaterOutputStream to traditional ZipOutputStream
  CustomizedZipDataFormat zipFormat = new CustomizedZipDataFormat();
  from("file://C:/test2/splitted").marshal(zipFormat)
   .to("file://C:/test2?fileName=${file:onlyname.noext}.zip")
   .log("Zipped : ${file:onlyname.noext}");
 }
 
 class CsvSorter implements Comparator<List<String>> {
  @Override
  public int compare(List<String> o1, List<String> o2) {
   int result = 0;
   // Do not treat first line (headers)
   if(!o1.contains(csvIdentityColumn) && !o2.contains(csvIdentityColumn)) {
    String ccom1 = o1.get(csvIdentityColumnNumber).substring(0, csvIdentityLength);
    String ccom2 = o2.get(csvIdentityColumnNumber).substring(0, csvIdentityLength);
    result = ccom1.compareTo(ccom2);
   }
   return result;
  }
 }
}

Pour ce qui est du service permettant de segmenter les informations CSV, voici son squelette :

public class CsvService {
 
 /**
  * Receives csv informations and split it out to multiple messages
  * Received datas are pre-ordered
  * @param headers in-message headers
  * @param body in-message body (unmarshalled csv content)
  * @return messages containing csv info to be written
  */
 public List<Message> splitDatas(@Headers Map<String, Object> headers,
   @Body List<List<String>> body) {
  List<Message> answer = new ArrayList<Message>();
  
  // headers
  List<String> csvHeaders = body.get(0);
  
  String csvDelimiter = (String) headers.get("csvDelimiter");
  
  // data
  List<List<String>> datas = body.subList(1, body.size());

  (... sort routine ...)

  return answer;
 }
}

Pour ce qui est du service permettant de dézipper l'archive :

public class ZipService {

 /**
  * Splits in message to multiple messages for entries
  * @param headers in headers
  * @param body in body
  * @return one message per zip entry
  */
 public List<Message> unzipFile(@Headers Map<String, Object> headers,
   @Body Object body) {
  List<Message> answer = new ArrayList<Message>();
  try {
   ZipInputStream zis = new ZipInputStream(new ByteArrayInputStream(
     (byte[]) body));
   ZipEntry ze = null;
   String entryName = "";
   String unzippedFiles = "";
   while ((ze = zis.getNextEntry()) != null) {
    entryName = ze.getName();
    unzippedFiles += entryName + ",";
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    for (int c = zis.read(); c != -1; c = zis.read()) {
     out.write(c);
    }
    zis.closeEntry();
    out.close();
    DefaultMessage message = new DefaultMessage();
    Map<String, Object> newHeaders = new CaseInsensitiveMap(headers);
    newHeaders.put("entryName", entryName);
    newHeaders.put("unzippedFiles", unzippedFiles);
    message.setHeaders(newHeaders);
    message.setBody(out.toByteArray());
    answer.add(message);
   }
   zis.close();
  } catch (Throwable e) {
   e.printStackTrace();
  }
  return answer;
 }
}

Enfin, le code utilisé pour créer des archives zip utilisables (via CustomizedZipDataFormat) provient de cette page.

J'espère que tout ce code ne paraît pas trop indigeste, mais en y regardant de plus près, on s'aperçoit que Camel permet assez facilement de mettre en place ce genre de flux de données, en peu de lignes de code et surtout de manière plutôt lisible. La documentation est d'ailleurs incroyablement bien faite pour ce qui concerne la description des patrons et des composants natifs.

Le seul bémol que j'ajouterai est la gestion des formats ZIP. En effet, par défaut, Camel crée des DeflaterOutputStream : je ne sais pas trop d'où provient ce format, mais en tout cas il ne permet pas directement de créer des archives lisibles. Il faut donc explicitement les convertir en ZipOutputStream classique.

N'hésitez pas à m'indiquer si vous avez déjà utilisé cet outil et surtout si vous voyez des façons d'améliorer ce que j'ai présenté!

Sources :


Fichier(s) joint(s) :

9 commentaires: