Saturday, May 16, 2009

Entreprise Integration Pattern with Apache Camel 2.0

Apache Camel is an open source messaging routing framework based on the enterprise integration pattern described in the book of the same name written by Gregor Hohpe and Bobby Woolf.

Apache Camel can easily work with any kind of transport or messaging model such as HTTP, JMS, CXF, POP and a lot of others.

Camel lets you create the Enterprise Integration Patterns to implement routing and mediation rules in either Java based DSL (Domain Specific Language), via Spring configuration files or via the Scala DSL .

Personally, I recommend using the Java DSL which is maybe a bit confusing at first sight but is very powerful in the end. The benefits of using the Java DSL is that your IDE can smart complete your code as you start typing, rather than having to mess around with buckets of XML.

Let's try with an example. We have a CSV file containing candidates for a job. Each line represents the candidates characteristics (name, size and age).

Our goal is to process this file, find if the candidate is suitable for the job and then send the result in XML into a JMS queue.

First, let's create our CSV file named testfile.csv in the src/test/resources directory with the content below :

Georges,14,168
Alain,58,175
Jean,67,168

To start our Camel example, we have to create a route by implementing the RouteBuilder interface.

CandidateRouteBuilder.java
package com.jdechmann.proto.camel.route;

import java.util.List;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.xstream.XStreamDataFormat;

import com.jdechmann.proto.vo.Candidate;
import com.thoughtworks.xstream.XStream;

public class CandidateRouteBuilder extends RouteBuilder {

@Override
public void configure() throws Exception {

XStream xstream = new XStream();
xstream.processAnnotations(Candidate.class);

XStreamDataFormat xStreamDataFormat = new XStreamDataFormat();
xStreamDataFormat.setXStream(xstream);

//Intercept the exceptions
onException(Exception.class)
.handled(true)
//Convert the object to XML
.marshal(xStreamDataFormat)
//Send the result to a JMS queue
.to("jms:queue.candidate.rejected");

//THE ROUTE STARTS HERE

//Consume from CSV files
from("file:src/test/resources/?fileName=testfile.csv")
//Unmarshal CSV files. The resulting message contains a List<List<String>>
.unmarshal().csv()
//Split the message into a number of pieces
.split(body(List.class))
//Convert the message into a Person object
.convertBodyTo(Candidate.class)
//Process the candidates
.process(new CandidateProcessor())
//Convert the object to XML
.marshal(xStreamDataFormat)
//Send the result to a JMS queue
.to("jms:queue.candidate.selected");
}
}

The spring configuration file contains the location of the RouteBuilder classes and the configuration of the JMS broker. You will need to have an activeMQ server running to make this example works.

applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">

<bean id="camelTracer" class="org.apache.camel.processor.interceptor.Tracer">
<property name="traceOutExchanges" value="true" />
</bean>

<bean id="traceFormatter" class="org.apache.camel.processor.interceptor.DefaultTraceFormatter">
<property name="showOutBody" value="true" />
<property name="showOutBodyType" value="true" />
</bean>

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<package>com.jdechmann.proto.camel.route</package>
</camelContext>

<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

</beans>

Then we have to create our Java business object.

Candidate.java
package com.jdechmann.proto.vo;

import com.thoughtworks.xstream.annotations.XStreamAlias;

@XStreamAlias("candidate")
public class Candidate {

private String name;
private int age;
private int size;

public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getSize() {
return size;
}
public void setSize(int size) {
this.size = size;
}

@Override
public String toString() {
return "Candidate - " + " Name: " + name +
" Age: " + age + " Size: " + size;
}
}

Here's the processor which have to implement the camel Processor interface.
This class will send an exception if the candidate does not fit for the job.

CandidateProcessor.java
package com.jdechmann.proto.camel.route;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

import com.jdechmann.proto.vo.Candidate;

/**
* Process a Candidate object
* Throw an exception if the candidate does not
* match the criterias
*/
public class CandidateProcessor implements Processor {

@Override
public void process(Exchange exchange) throws Exception {
Candidate candidate = exchange.getIn().getBody(Candidate.class);

if(candidate.getAge() > 60 || candidate.getSize() < 160)
throw new Exception("Candidate refused " + candidate.toString());
else
System.out.println("Candidate accepted " + candidate.toString());
}
}

In order to convert the message body into a Candidate object, you need to tell Camel where to find your converter class. Camel will look in the classpath for a file named TypeConverter in META-INF/services/org/apache/camel/ directory. The TypeConverter file must contains the name of your package (com.jdechmann.proto.camel.converter in our case).

CandidateConverter.java
package com.jdechmann.proto.camel.converter;

import java.util.List;

import org.apache.camel.Converter;
import com.jdechmann.proto.vo.Candidate;

@Converter
public class CandidateConverter {

@Converter
public Candidate toCandidate(List personArray) {

Candidate candidate = new Candidate();
candidate.setName(personArray.get(0));
candidate.setAge(Integer.valueOf(personArray.get(1)));
candidate.setSize(Integer.valueOf(personArray.get(2)));

return candidate;
}
}

Finally, here's our Main class to start our route.

Main.java
package com.jdechmann.proto;

import org.apache.camel.CamelContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {

public static void main(String[] args) throws Exception {
ApplicationContext context =
new ClassPathXmlApplicationContext("applicationContext.xml");

//Starting the camel context
CamelContext camel = (CamelContext) context.getBean("camel");
camel.start();
}
}

If you're using maven, you will need to add the following dependencies to you POM file.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>2.0-M1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.0-M1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>2.0-M1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-csv</artifactId>
<version>2.0-M1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-xstream</artifactId>
<version>2.0-M1</version>
</dependency>
If you want to see the route traces, add log4j in your dependencies
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
And add a log4j.properties in your classpath

# Set root category priority to INFO and its only appender to CONSOLE.
log4j.rootCategory=DEBUG, CONSOLE

# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=- %m%n

You can download the code of this example from this URL

12 comments:

  1. Julien this is a very well written blog with a great example. Do you mind that we add a link to your blog entry on the Camel articles page?

    http://camel.apache.org/articles.html

    If you prefer a certain title and description on the articles page then let me know.

    ReplyDelete
  2. Thanks Claus. Of course you can add a link to my blog entry.

    ReplyDelete
  3. good. this is very informative. Keep it up.

    ReplyDelete
  4. Highly helpful content for a Camel beginner.
    Thanks dude.

    ReplyDelete
  5. I just found your example and is was very helpful. Now i am wondering how to skip the first line of an csv file in your example.

    Again, thanks a lot!

    ReplyDelete
  6. This comment has been removed by the author.

    ReplyDelete
  7. When i shut down the jms server messages always tried to send to server infinitly. I want to configure a kind of scenario that when jms server is down messages retried 5 or 10 times then file is moved to a error directory Can this be possible to configure?

    ReplyDelete
  8. how could i run this sample code....??

    ReplyDelete
  9. Thanks for your Blog!
    And I have a queston.

    How does the class CandidateConverter.java work?
    Where should I use the function toCandidate?

    ReplyDelete
  10. org.apache.camel.InvalidPayloadException: No body available of type: de.hellomaven.dto.Kunde but has value: [Max, Mustermann, 23.04.1987, Musterstrasse, 5, 56748, Köln] of type: java.util.Arrays.ArrayList on: null. Caused by: No type converter available to convert from type: java.util.Arrays.ArrayList to the required type: de.timetoact.hellomaven.dto.Kunde with value [Max, Mustermann, 23.04.1987, Musterstrasse, 5, 56748, Köln]. Exchange[null]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: java.util.Arrays.ArrayList to the required type: de.timetoact.hellomaven.dto.Kunde with value [Max, Mustermann, 23.04.1987, Musterstrasse, 5, 56748, Köln]]

    ReplyDelete