Recently I started looking into Apache Kafka as our distributed messaging solution. I was already using Apache Camel for different transformation and processing messages using ActiveMQ broker. Apparently the use case is pretty simple and depicted in the diagram below:
So we have a producer and a consumer and they are connected to a Active MQ broker. The producer produces some messages to the broker which are consumed by the consumer. This is the basics of interaction across multiple application through messaging communication. The details of messaging is not the scope of this post though. I assume you are know the basics (i.e. synchronous vs. asynchronous messaging; point-to-point vs publish-subscribe messaging etc.)
As Kafka can produce greater capability than ActiveMQ in terms of scaling, our team started looking this option also. So in this example I will build one producer and one consumer application using Apache Camel route. The producer and consumer will talk to a Kafka broker as shown in the below diagram.
When I started doing this implementation I found very little resource in the internet covering the end-to-end processes depicted above. So thought of putting everything in one place.
To start with we will make it little simple. we will create two Camel application (producer and consumer) and send string messages from one to other.
Here is the tech stack:
- Apache Kafka 0.8.2
- Apache Camel 2.14.1
- Maven 3.2.5
- Tomcat 7.x
Prerequisite:
1. Download the Kafka binary
2. Follow the documentation and install the same.
3. Start the Zookeeper server
4. Start the kafka server
5. Create a topic with name test, replication-factor 1 and 1 partition
Producer:
Now we will create the producer application. Following is the eclipse project structure.
pom.xml
Add the following dependencies in the pom.xml
camel-config.xml
Following is the route to be configured inside the camel-config.xml:
In the above camel route we have a direct start. Then we are setting a partition key needed for kafka (without this the route will fail to send message to kafka broker).
Kafka Server: The syntax of a camel-kafka component is
The producer endpoint has been configured to connect to the kafka server running at 99.99.99.99 on port 9092 (this is a test ip, please use the actual one for your case). You can use localhost if the kafka server is running in the same machine where the producer is running.
Options:
topic: the topic name the producer endpoint connects to
zookeeperHost: the zookeeper host (e.g. 99.99.99.99)
zookeeperPort: the zookeeper port (e.g. 2181)
groupId: the group id producer wants to connect to.
serializerClass: mention the encoder class based on the message type. For e.g. as we are dealing with String messages we can use a Kafka out-of-the-box serializer class kafka.serializer.StringEncoder.
If you want to connect to a remote Kafka broker you need to update the advertised.host.name field in the server.properties of the kafka server with the public IP of the server.
advertised.host.name=99.99.99.99
Consumer
Now we will create the consumer application. The consumer application will have the similar project structure and same dependencies in the pom.xml. The consumer route in the camel-config.xml will be as follows:
Now if we deploy the two application (producer and consumer war) in the tomcat container and start the server we will see the following log.
So we have a producer and a consumer and they are connected to a Active MQ broker. The producer produces some messages to the broker which are consumed by the consumer. This is the basics of interaction across multiple application through messaging communication. The details of messaging is not the scope of this post though. I assume you are know the basics (i.e. synchronous vs. asynchronous messaging; point-to-point vs publish-subscribe messaging etc.)
As Kafka can produce greater capability than ActiveMQ in terms of scaling, our team started looking this option also. So in this example I will build one producer and one consumer application using Apache Camel route. The producer and consumer will talk to a Kafka broker as shown in the below diagram.
When I started doing this implementation I found very little resource in the internet covering the end-to-end processes depicted above. So thought of putting everything in one place.
To start with we will make it little simple. we will create two Camel application (producer and consumer) and send string messages from one to other.
Here is the tech stack:
- Apache Kafka 0.8.2
- Apache Camel 2.14.1
- Maven 3.2.5
- Tomcat 7.x
Prerequisite:
1. Download the Kafka binary
2. Follow the documentation and install the same.
3. Start the Zookeeper server
> bin/zookeeper-server-start.sh config/zookeeper.properties
4. Start the kafka server
> bin/kafka-server-start.sh config/server.properties
5. Create a topic with name test, replication-factor 1 and 1 partition
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Producer:
Now we will create the producer application. Following is the eclipse project structure.
pom.xml
Add the following dependencies in the pom.xml
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>2.14.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>3.2.11.RELEASE</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-kafka</artifactId> <version>2.14.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.6</version> </dependency>
camel-config.xml
Following is the route to be configured inside the camel-config.xml:
<camelContext xmlns="http://camel.apache.org/schema/spring" > <route id="kafka-producer"> <from uri="direct:start"/> <setHeader headerName="kafka.PARTITION_KEY"> <constant>Test</constant> </setHeader> <log message="Started The Producer Route" /> <to uri="bean:service?method=getSampleString()"/> <camel:log message="${body}" /> <to uri="kafka:99.99.99.99:9092?topic=test&zookeeperHost=99.99.99.99&zookeeperPort=2181&groupId=group1&serializerClass=kafka.serializer.StringEncoder"/> </route> </camelContext> <bean id="service" class="com.tridib.inventriz.service.ServiceImpl" />
In the above camel route we have a direct start. Then we are setting a partition key needed for kafka (without this the route will fail to send message to kafka broker).
Kafka Server: The syntax of a camel-kafka component is
kafka:[Server:port]?options
Options:
topic: the topic name the producer endpoint connects to
zookeeperHost: the zookeeper host (e.g. 99.99.99.99)
zookeeperPort: the zookeeper port (e.g. 2181)
groupId: the group id producer wants to connect to.
serializerClass: mention the encoder class based on the message type. For e.g. as we are dealing with String messages we can use a Kafka out-of-the-box serializer class kafka.serializer.StringEncoder.
If you want to connect to a remote Kafka broker you need to update the advertised.host.name field in the server.properties of the kafka server with the public IP of the server.
advertised.host.name=99.99.99.99
Consumer
Now we will create the consumer application. The consumer application will have the similar project structure and same dependencies in the pom.xml. The consumer route in the camel-config.xml will be as follows:
<camelContext xmlns="http://camel.apache.org/schema/spring" > <route id="kafka-consumer"> <from uri="kafka:99.99.99.99:9092?topic=test&zookeeperHost=99.99.99.99&zookeeperPort=2181&groupId=group1"/> <log message="Started The Consumer Route" /> <camel:log message="${body}" /> </route></camelContext>
Now if we deploy the two application (producer and consumer war) in the tomcat container and start the server we will see the following log.
2015-08-20 18:10:11,510 [direct:///start]INFO kafka-producer - Started The Producer Route
2015-08-20 18:10:11,510 [direct:///start] INFO kafka-producer - This is a sample message
2015-08-20 18:10:11,991 [direct:///start]
INFO
ClientUtils$ - Fetching metadata from broker id:0,host:99.99.99.99,port:9092 with correlation id 0 for 1
topic(s) Set(test)
2015-08-20 18:10:12,242 [direct:///start]
INFO
SyncProducer
- Connected to 99.99.99.99:9092 for producing
2015-08-20 18:10:12,752 [direct:///start]
INFO SyncProducer
- Disconnecting from 99.99.99.99:9092
2015-08-20 18:10:13,018 [direct:///start]
INFO
SyncProducer
- Connected to 99.99.99.99:9092 for producing
2015-08-20 18:10:13,729 [pool-2-thread-4] INFO
kafka-consumer - Started The Consumer Route
2015-08-20 18:10:13,729 [pool-2-thread-4] INFO kafka-consumer - This is a sample message
So we can see the string message "This is a sample message" (returned from the method getSampleString() of the class ServiceImpl) will be consumed by the consumer.
In our next post we will try to work with some custom java object (POJO) with the similar use case.