Hello All,
I am trying to create a Consumer using Apache Camel for a topic in Apache Kafka.
I am using Camel 2.17.0 and Kafka 0.10 and JDK 1.8.
I have attached a file, KafkaCamelTestConsumer.java which is a standalone
application trying to read from a topic "test1"created in Apache Kafka
I am producing messages from the console and also was successful to produce
messages using a Camel program in the topic "test1", but not able to consume
messages. Ideally, it should get printed, but nothing seems to happen. The log
says that the route has started but does not process any message.
Please help to confirm if there is anything wrong with the below syntax:
from("kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest"
+
"&consumersCount=1&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
+
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true").split()
.body()
.process(new Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String messageKey = "";
if (exchange.getIn() != null) {
Message message = exchange.getIn();
Integer partitionId = (Integer) message
.getHeader(KafkaConstants.PARTITION);
String topicName = (String) message
.getHeader(KafkaConstants.TOPIC);
if (message.getHeader(KafkaConstants.KEY) != null)
messageKey = (String) message
.getHeader(KafkaConstants.KEY);
Object data = message.getBody();
System.out.println("topicName :: "
+ topicName + " partitionId :: "
+ partitionId + " messageKey :: "
+ messageKey + " message :: "
+ data + "\n");
}
}
}).to("file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8");
}
});
I have also tried with the basic parameters as below and it still fails to read
messages.
from("kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest")
Any help on this will be greatly appreciated.
Thanks in advance
Thanks & Regards
Swati
This e-mail and any attachments to it (the "Communication") is, unless
otherwise stated, confidential, may contain copyright material and is for the
use only of the intended recipient. If you receive the Communication in error,
please notify the sender immediately by return e-mail, delete the Communication
and the return e-mail, and do not read, copy, retransmit or otherwise deal with
it. Any views expressed in the Communication are those of the individual sender
only, unless expressly stated to be those of Australia and New Zealand Banking
Group Limited ABN 11 005 357 522, or any of its related entities including ANZ
Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in
connection with the integrity of or errors in the Communication, computer
virus, data corruption, interference or delay arising from or in respect of the
Communication.