Author: Sanket Kangle

Table of Contents

  • Introduction to Apache Kafka
  • Software Versions used in this Demo
  • Download Apache Kafka
  • Disabling Hyper-V
  • Necessary changes in extracted Kafka files
  • zookeeper server deployment
  • Kafka server deployment
  • Create a topic
  • Create a producer
  • Create a consumer
  • Create MuleSoft publish flow
  • Create MuleSoft consume flow
  • Test the created flows using Postman
  • Some useful Kafka commands

Introduction to Apache Kafka

Apache Kafka is an open-source stream-processing software platform developed by the Apache Software Foundation, written in Scala and Java.

The benefits of Kafka are as following:

  • High-throughput
  • low-latency
  • Horizontally scalable
  • Ensures no data loss
  • Reduces complexity

Basic Elements of Apache Kafka

  • Topic — a category or feed name to which records are published. A topic can have zero, one, or many consumers that subscribe to the data written to it.
  • Partition — A topic can have one or more partitions associated with handling large volumes of data. Each partition is an ordered, immutable sequence of records continually appended to — a structured commit log.
  • Partition offset — The partitions’ records are each assigned a sequential id number that uniquely identifies each record within the partition.
  • Brokers — a Kafka server that runs in a Kafka Cluster. It manages the storage of messages in the topics.
  • Kafka Cluster — Kafka brokers form a Kafka cluster. The Kafka Cluster consists of many Kafka Brokers on many servers. Each Kafka broker has a unique ID.
  • Producers — publish data to topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic.
  • Consumers — read and process data from topics.

Software Versions used in this Demo

  • Java 1.8
  • Mule Runtime 4.3.0 EE
  • Anypoint Studio 7.8.0
  • Apache Kafka 2.13
  • Apache Kafka Connector — Mule 4 version 4.3.3
  • Windows 10

Download Apache Kafka

To use Kafka, you must have installed JDK on your system. If not already done so, Kindly do that before proceeding. Install the latest version of Apache Kafka binaries from here. Extract the binaries in your C folder and name the extracted folder “Kafka” (you can name anything you want or keep it as it is)

In earlier versions of Kafka, we also needed to install Zookeeper separately, but in recent versions, all the zookeeper dependencies are already included in Kafka binaries itself.

Disabling Hyper-V

If you are using a dataweave playground (or any other Docker server) on your local system using Docker server, Kafka will not work; disable your Hyper-V by following the below steps.

Go to your control panel and go to uninstall a program

Click on Turn Windows features on or off

Ensure the Hyper-V is unchecked, and click on ok. It may ask you to restart the computer, do that.

Now, you are ready to proceed with extracting Kafka.

Necessary changes in extracted Kafka files

Go to your Kafka>config folder. You should see the following files. We need to make changes in Zookeeper.properties and server.properties.

Changes in zookeeper.properties

Open zookeeper.properties using notepad or notepad++(or any other editor of your choice) and give a proper address to the dataDir variable. Y0u can also change the port from this same file if needed.

Changes in server.properties

Open server.properties using notepad or notepad++(or any other editor of your choice) and properly address the logs.dirs variable.

Also add the following lines in the internal topic settings section as shown in exhibit below

offsets.topic.num.partitions=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
min.insync.replicas=1
default.replication.factor=1

Now you are set to deeply Kafka server locally.

Zookeeper server deployment

Kafka needs ZooKeeper to manage the cluster. ZooKeeper is used to coordinate the brokers/cluster topology. ZooKeeper is a consistent file system for configuration information. ZooKeeper is used for leadership election for Broker Topic Partition Leaders. So, we have to start Zookeeper before starting the Kafka server on our machine.

Open command prompt in the path kafka\bin\windows and run the following command.

zookeeper-server-start.bat C:\kafka\config\zookeeper.properties

your zookeeper should start on port 2182.

Kafka server deployment

Open a new command line in kafka/bin/windows and run following command to deploy kafka server

kafka-server-start.bat C:\kafka2.13\config\server.properties

Your Kafka server should start.

Create a topic

Open a new command line in kafka/bin/windows and run the following command to create a topic

kafka-topics.bat -create -zookeeper localhost:2181 -replication-factor 1 -partitions 1 -topic topicName

To verify, if the topic is created successfully or not, use the following command

kafka-topics.bat -list -zookeeper localhost:2181

Create a producer

Now we need to create a producer to publish the messages to the topic created. Use the following command for the same.

kafka-console-producer.bat -broker-list localhost:9092 -topic DemoTopic

If you get > as shown in the exhibit above, congrats, you are officially ready to publish in your created topic. Write some message to publish in your topic. an example is in the following exhibit.

Create a consumer

Now, let us create a consumer that consumes these messages. Open a new command line in Kafka/bin/windows and run the following command to create a consumer to consume DemoTopic.

kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic DemoTopic –from-beginnig

You should be able to get messages published in topic DemoTopic in your consumer prompt.

You can again publish some messages in produces prompt and see it get consumed in consumer prompt

Create MuleSoft publish flow

Note: when you will run this flows, the Kafka server should be deployed on your system, in layman’s term, keep all the command prompt we created till now running

Let us now integrate this Kafka server with Mule 4. Create a new project and import the latest version of the Apache Kafka connector from the Anypoint Exchange. 

You should have the above processors available in the module. 

Create a simple-flow that looks like follows

Set all the configurations as follows:

Listener configuration 

Set the method to POST in the Advanced tab of the listener properties.

Leave all the rest configurations default.

Logger

Just log that the message is received for publishing in the Kafka topic.

Publish

Set the global element properties as in the below exhibit.

And click on the test connection. Your connection should be successful. 

Click ok and again ok.

Add this configuration in the properties of publishing. We will take the name of the topic and message from the HTTP request. Hence put payload.topicName in topic and payload.message in Message section(both in expression mode). Also, we are giving the key as the current time using #[now()].

Set Payload

Just setting a success message in the payload so that we can understand that publishing is done successfully.

Following is the XML code for the flow.

<?xml version=”1.0″ encoding=”UTF-8″?>

<mule xmlns:kafka=”http://www.mulesoft.org/schema/mule/kafka” xmlns:http=”http://www.mulesoft.org/schema/mule/http”
xmlns=”http://www.mulesoft.org/schema/mule/core”
xmlns:doc=”http://www.mulesoft.org/schema/mule/documentation” xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=”http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd”>
<http:listener-config name=”HTTP_Listener_config” doc:name=”HTTP Listener config” doc:id=”f6f22d84-88b0-417c-a113-b19493b8b3d7″ >
<http:listener-connection host=”0.0.0.0″ port=”8081″ />
</http:listener-config>
<kafka:producer-config name=”Apache_Kafka_Producer_configuration” doc:name=”Apache Kafka Producer configuration” doc:id=”0954fe59-5f5d-459b-bd70-4c1391e8d503″ >
<kafka:producer-plaintext-connection >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value=”localhost:9092″ />
</kafka:bootstrap-servers>
</kafka:producer-plaintext-connection>
</kafka:producer-config>
<http:listener-config name=”HTTP_Listener_config1″ doc:name=”HTTP Listener config” doc:id=”0cb3ae7f-3e96-414d-9ccb-b5d0e997de50″ >
<http:listener-connection host=”0.0.0.0″ port=”8089″ />
</http:listener-config>
<flow name=”Publish-flow” doc:id=”5e1b6dfa-1b94-4fe4-bf75-46c70ef76192″ >
<http:listener doc:name=”Listener” doc:id=”3c59d0cf-25ce-4066-9c3f-98a1e02751dc” config-ref=”HTTP_Listener_config1″ path=”/kafkaPublish” allowedMethods=”POST”>
<http:response >
<http:body ><![CDATA[#[‘Message delivered successfully to the Topic: DemoTopic’]]]></http:body>
</http:response>
</http:listener>
<logger level=”INFO” doc:name=”Logger” doc:id=”bda4eda1-7803-433e-880b-f37620bd7585″ message=”Message recieved for publishing”/>
<kafka:publish doc:name=”Publish” doc:id=”68d26c40-d06c-4f8d-9685-3c4bd6e3358b” config-ref=”Apache_Kafka_Producer_configuration” topic=’#[payload.topicName]’ key=”#[now()]”>
<kafka:message ><![CDATA[#[payload.message]]]></kafka:message>
</kafka:publish>
<set-payload value=’#[“message is published to the topic”]’ doc:name=”Set Payload” doc:id=”89cf9d45-a726-4bc8-afab-f3e99c46f3b6″ />
</flow>
</mule>

Create MuleSoft consume flow

Now, let’s create a flow to consume the messages published in the kafka topic.

Create a simple flow as below.

Set all the configurations as follows:

Message listener

Then click on Test Connection. Your connection should be successful.

Click ok and ok.

Leave all other properties to default.

Logger

Just logging a message to know that message is successfully consumed.

XML code for the code is as below

<?xml version=”1.0″ encoding=”UTF-8″?>

<mule xmlns:kafka=”http://www.mulesoft.org/schema/mule/kafka” xmlns=”http://www.mulesoft.org/schema/mule/core”
xmlns:doc=”http://www.mulesoft.org/schema/mule/documentation”
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=”http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd”>
<kafka:consumer-config name=”Apache_Kafka_Consumer_configuration” doc:name=”Apache Kafka Consumer configuration” doc:id=”df0a2808-0a22-4d25-a181-5b4d36f7231d” >
<kafka:consumer-plaintext-connection groupId=”test-consumer-group”>
<kafka:bootstrap-servers >
<kafka:bootstrap-server value=”localhost:9092″ />
</kafka:bootstrap-servers>
<kafka:topic-patterns >
<kafka:topic-pattern value=”DemoTopic” />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
<flow name=”kafka-ConsumeFlow” doc:id=”88d7c425-47ed-402a-ab30-5b979cd2d324″ >
<kafka:message-listener doc:name=”Message listener” doc:id=”cdefe0a3-1a4e-46f4-8c26-6ddbeb2b5938″ config-ref=”Apache_Kafka_Consumer_configuration”/>
<logger level=”INFO” doc:name=”Logger” doc:id=”3b2bc80d-5810-4e55-9c95-f58c06555330″ message=’#[“Message consumed “]’/>
</flow>
</mule>

Now we are ready to test the application. deploy the application

Check in the console, Once the application is deployed, send a POST request from Postman or Advance Rest Client as shown below.

You should receive the following message in the Postman Response tab.

By checking the consol in Anypoint studio, ensure you get a response from the publisher flow logger as below.

And by consumer flow logger as below.

Now, also check the command prompt of consume, the message from Postman request should have gotten consumed there.

Check your publisher prompt, there is no new input, these messages are published in the topic using HTTP request, not command prompt.

This is how you can integrate, publish and consume kafka messages using Mule 4.

Hope this was helpful.

Some useful Kafka commands

##########   1    ############### start zookeeper server
zookeeper-server-start.bat C:\kafka2.13\config\zookeeper.properties
##########    2      ############# start kafka server
kafka-server-start.bat C:\kafka2.13\config\server.properties
##########    3      ############# create a topic
kafka-topics.bat -create -zookeeper localhost:2181 -replication-factor 1 -partitions 1 -topic DemoTopic
##########    4      #############  view the available list of topics
kafka-topics.bat -list -zookeeper localhost:2181
##########    5      ############# start a producer
kafka-console-producer.bat -broker-list localhost:9092 -topic DemoTopic

##########    6      ############# start a consumer
kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic DemoTopic –from-beginning

Leave a Comment