Thursday, 20 August 2015

Integrate Apache Camel with Apache Kafka - 1

Recently I started looking into Apache Kafka as our distributed messaging solution. I was already using Apache Camel for different transformation and processing messages using ActiveMQ broker. Apparently the use case is pretty simple and depicted in the diagram below:



So we have a producer and a consumer and they are connected to a Active MQ broker. The producer produces some messages to the broker which are consumed by the consumer. This is the basics of interaction across multiple application through messaging communication. The details of messaging is not the scope of this post though. I assume you are know the basics (i.e. synchronous vs. asynchronous messaging; point-to-point vs publish-subscribe messaging etc.)

As Kafka can produce greater capability than ActiveMQ in terms of scaling, our team started looking this option also. So in this example I will build one producer and one consumer application using Apache Camel route. The producer and consumer will talk to a Kafka broker as shown in the below diagram.




When I started doing this implementation I found very little resource in the internet covering the end-to-end processes depicted above. So thought of putting everything in one place.


To start with we will make it little simple. we will create two Camel application (producer and consumer) and send string messages from one to other.

Here is the tech stack:

- Apache Kafka 0.8.2
- Apache Camel 2.14.1
- Maven 3.2.5
- Tomcat 7.x

Prerequisite:

1. Download the Kafka binary
2. Follow the documentation and install the same.
3. Start the Zookeeper server
> bin/zookeeper-server-start.sh config/zookeeper.properties

4. Start the kafka server
> bin/kafka-server-start.sh config/server.properties

5. Create a topic with name test, replication-factor 1 and 1 partition
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


Producer:

Now we will create the producer application. Following is the eclipse project structure.




pom.xml

Add the following dependencies in the pom.xml
              <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-spring</artifactId>
   <version>2.14.1</version>
  </dependency>  
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-web</artifactId>
   <version>3.2.11.RELEASE</version>
  </dependency>  
  <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-kafka</artifactId>
      <version>2.14.1</version>
  </dependency>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.4</version>
  </dependency>
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.17</version>
  </dependency>
  <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.6.6</version>
  </dependency>

camel-config.xml

Following is the route to be configured inside the camel-config.xml:
       <camelContext xmlns="http://camel.apache.org/schema/spring" >    
     <route id="kafka-producer"> 
      <from uri="direct:start"/>
      <setHeader headerName="kafka.PARTITION_KEY">
                    <constant>Test</constant>
                </setHeader>
          <log message="Started The Producer Route" />
          <to uri="bean:service?method=getSampleString()"/>  
          <camel:log message="${body}" />
          <to uri="kafka:99.99.99.99:9092?topic=test&amp;zookeeperHost=99.99.99.99&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder"/>         </route>         
 </camelContext>  
 <bean id="service" class="com.tridib.inventriz.service.ServiceImpl" />

In the above camel route we have a direct start. Then we are setting a partition key needed for kafka (without this the route will fail to send message to kafka broker). 
Kafka Server: The syntax of a camel-kafka component is

kafka:[Server:port]?options

The producer endpoint has been configured to connect to the kafka server running at 99.99.99.99 on port 9092 (this is a test ip, please use the actual one for your case). You can use localhost if the kafka server is running in the same machine where the producer is running.
Options
topic: the topic name the producer endpoint connects to 
zookeeperHost: the zookeeper host (e.g. 99.99.99.99)
zookeeperPort: the zookeeper port (e.g. 2181)
groupId: the group id producer wants to connect to.
serializerClass: mention the encoder class based on the message type. For e.g. as we are dealing with String messages we can use a Kafka out-of-the-box serializer class kafka.serializer.StringEncoder.

If you want to connect to a remote Kafka broker you need to update the advertised.host.name field in the server.properties of the kafka server with the public IP of the server.

advertised.host.name=99.99.99.99


Consumer

Now we will create the consumer application. The consumer application will have the similar project structure and same dependencies in the pom.xml. The consumer route in the camel-config.xml will be as follows:


       <camelContext xmlns="http://camel.apache.org/schema/spring" >    
     <route id="kafka-consumer"> 
      <from uri="kafka:99.99.99.99:9092?topic=test&amp;zookeeperHost=99.99.99.99&amp;zookeeperPort=2181&amp;groupId=group1"/>
          <log message="Started The Consumer Route" />
          <camel:log message="${body}" />
        </route></camelContext>           



Now if we deploy the two application (producer and consumer war) in the tomcat container and start the server we will see the following log.


2015-08-20 18:10:11,510 [direct:///start]INFO kafka-producer                - Started The Producer Route
2015-08-20 18:10:11,510 [direct:///start] INFO  kafka-producer                 - This is a sample message
2015-08-20 18:10:11,991 [direct:///start] INFO  ClientUtils$                       - Fetching metadata from broker id:0,host:99.99.99.99,port:9092 with correlation id 0 for 1 topic(s) Set(test)
2015-08-20 18:10:12,242 [direct:///start] INFO  SyncProducer                   - Connected to 99.99.99.99:9092 for producing
2015-08-20 18:10:12,752 [direct:///start] INFO  SyncProducer                   - Disconnecting from 99.99.99.99:9092
2015-08-20 18:10:13,018 [direct:///start] INFO  SyncProducer                   - Connected to 99.99.99.99:9092 for producing
2015-08-20 18:10:13,729 [pool-2-thread-4] INFO  kafka-consumer         - Started The Consumer Route
2015-08-20 18:10:13,729 [pool-2-thread-4] INFO  kafka-consumer         - This is a sample message


So we can see the string message "This is a sample message" (returned from the method getSampleString() of the class ServiceImpl) will be consumed by the consumer.

In our next post we will try to work with some custom java object (POJO) with the similar use case.