Hello 

I was doing a POC to connect to the Apache Kafka brokers. 

My Route looks something like this..
* <route>
    <from
uri="kafka:10.0.0.2:9092?topic=testTopic&amp;zookeeperHost=10.0.0.2&amp;zookeeperPort=2181&amp;groupId=group1"/>
    <to uri="log:${body}"/>
  </route>
*

But unfortunately, when the route starts, I get following errors. Attaching
complete log.  Any clue pals?
*
[INFO] Using org.apache.camel.spring.Main to initiate a CamelContext
[INFO] Starting Camel ...
[pache.camel.spring.Main.main()] MainSupport                    INFO  Apache
Camel 2.15.1.redhat-620133 starting
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Apache
Camel 2.15.1.redhat-620133 (CamelContext: camel-1) is starting
[pache.camel.spring.Main.main()] ManagedManagementStrategy      INFO  JMX is
enabled
[pache.camel.spring.Main.main()] DefaultTypeConverter           INFO  Loaded
186 type converters
[pache.camel.spring.Main.main()] SpringCamelContext             INFO 
AllowUseOriginalMessage is enabled. If access to the original message is not
needed, then its recommended to turn this option off as it may improve
performance.
[pache.camel.spring.Main.main()] SpringCamelContext             INFO 
StreamCaching is not in use. If using streams then its recommended to enable
stream caching. See more details at
http://camel.apache.org/stream-caching.html
[pache.camel.spring.Main.main()] KafkaConsumer                  INFO 
Starting Kafka consumer
[pache.camel.spring.Main.main()] VerifiableProperties           INFO 
Verifying properties
[pache.camel.spring.Main.main()] VerifiableProperties           INFO 
Property group.id is overridden to group1
[pache.camel.spring.Main.main()] VerifiableProperties           INFO 
Property zookeeper.connect is overridden to 10.0.0.2:2181
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], Connecting to zookeeper instance at
10.0.0.2:2181
[t-EventThread-16-10.0.0.2:2181] ZkEventThread                  INFO 
Starting ZkClient event thread.
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:host.name=FWLYGZ1.dist.target.com
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:java.version=1.8.0_51
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:java.vendor=Oracle Corporation
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:java.home=C:\Program Files\Java\jdk1.8.0_51\jre
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:java.class.path=/C:/Users/Z001NLH/jbdevstudio/studio/plugins/org.eclipse.m2e.maven.runtime_1.5.1.20150109-1819/jars/plexus-classworlds-2.5.1.jar
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:java.library.path=C:\Program
Files\Java\jdk1.8.0_51\bin;C:\WINDOWS\Sun\Java\bin;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\Program
Files (x86)\Microsoft Application Virtualization
Client;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;%JAVA_HOME%\bin;.
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:java.io.tmpdir=C:\Users\Z001NLH\AppData\Local\Temp\
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:java.compiler=<NA>
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:os.name=Windows 7
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:os.arch=amd64
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:os.version=6.1
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:user.name=Z001NLH
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:user.home=C:\Users\Z001NLH
[pache.camel.spring.Main.main()] ZooKeeper                      INFO  Client
environment:user.dir=C:\Users\Z001NLH\workspace\camel-kafka
[pache.camel.spring.Main.main()] ZooKeeper                      INFO 
Initiating client connection, connectString=10.0.0.2:2181
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@595de70b
[pring.Main.main(10.0.0.2:2181)] ClientCnxn                     INFO 
Opening socket connection to server 10.0.0.2/10.0.0.2:2181. Will not attempt
to authenticate using SASL (unknown error)
[pring.Main.main(10.0.0.2:2181)] ClientCnxn                     INFO  Socket
connection established to 10.0.0.2/10.0.0.2:2181, initiating session
[pring.Main.main(10.0.0.2:2181)] ClientCnxn                     INFO 
Session establishment complete on server 10.0.0.2/10.0.0.2:2181, sessionid =
0x14f9b9793b20003, negotiated timeout = 6000
[spring.Main.main()-EventThread] ZkClient                       INFO 
zookeeper state changed (SyncConnected)
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], starting auto committer every 60000
ms
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], begin registering consumer
group1_FWLYGZ1-1441424704751-4f9d6f1d in ZK
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], end registering consumer
group1_FWLYGZ1-1441424704751-4f9d6f1d in ZK
[4751-4f9d6f1d_watcher_executor] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], starting watcher executor thread
for consumer group1_FWLYGZ1-1441424704751-4f9d6f1d
[pring.Main.main(10.0.0.2:2181)] ClientCnxn                     INFO  Client
session timed out, have not heard from server in 4001ms for sessionid
0x14f9b9793b20003, closing socket connection and attempting reconnect
[spring.Main.main()-EventThread] ZkClient                       INFO 
zookeeper state changed (Disconnected)
[pring.Main.main(10.0.0.2:2181)] ClientCnxn                     INFO 
Opening socket connection to server 10.0.0.2/10.0.0.2:2181. Will not attempt
to authenticate using SASL (unknown error)
[pring.Main.main(10.0.0.2:2181)] ClientCnxn                     INFO  Socket
connection established to 10.0.0.2/10.0.0.2:2181, initiating session
[pring.Main.main(10.0.0.2:2181)] ClientCnxn                     INFO 
Session establishment complete on server 10.0.0.2/10.0.0.2:2181, sessionid =
0x14f9b9793b20003, negotiated timeout = 6000
[spring.Main.main()-EventThread] ZkClient                       INFO 
zookeeper state changed (SyncConnected)
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], begin rebalancing consumer
group1_FWLYGZ1-1441424704751-4f9d6f1d try #0
[pache.camel.spring.Main.main()] ConsumerFetcherManager         INFO 
[ConsumerFetcherManager-1441424706113] Stopping leader finder thread
[pache.camel.spring.Main.main()] ConsumerFetcherManager         INFO 
[ConsumerFetcherManager-1441424706113] Stopping all fetchers
[pache.camel.spring.Main.main()] ConsumerFetcherManager         INFO 
[ConsumerFetcherManager-1441424706113] All connections stopped
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], Cleared all relevant queues for
this fetcher
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], Cleared the data chunks in all the
consumer message iterators
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], Committing all offsets after
clearing the fetcher queues
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], Releasing partition ownership
[pache.camel.spring.Main.main()] RangeAssignor                  INFO 
Consumer group1_FWLYGZ1-1441424704751-4f9d6f1d rebalancing the following
partitions: ArrayBuffer(0) for topic testTopic with consumers:
List(group1_FWLYGZ1-1441424704751-4f9d6f1d-0,
group1_FWLYGZ1-1441424704751-4f9d6f1d-1,
group1_FWLYGZ1-1441424704751-4f9d6f1d-2,
group1_FWLYGZ1-1441424704751-4f9d6f1d-3,
group1_FWLYGZ1-1441424704751-4f9d6f1d-4,
group1_FWLYGZ1-1441424704751-4f9d6f1d-5,
group1_FWLYGZ1-1441424704751-4f9d6f1d-6,
group1_FWLYGZ1-1441424704751-4f9d6f1d-7,
group1_FWLYGZ1-1441424704751-4f9d6f1d-8,
group1_FWLYGZ1-1441424704751-4f9d6f1d-9)
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-1 for topic testTopic
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-5 for topic testTopic
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-2 for topic testTopic
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-3 for topic testTopic
[pache.camel.spring.Main.main()] RangeAssignor                  INFO 
group1_FWLYGZ1-1441424704751-4f9d6f1d-0 attempting to claim partition 0
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-7 for topic testTopic
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-9 for topic testTopic
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-8 for topic testTopic
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-6 for topic testTopic
[pache.camel.spring.Main.main()] RangeAssignor                  WARN  No
broker partitions consumed by consumer thread
group1_FWLYGZ1-1441424704751-4f9d6f1d-4 for topic testTopic
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d],
group1_FWLYGZ1-1441424704751-4f9d6f1d-0 successfully owned partition 0 for
topic testTopic
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], Consumer
group1_FWLYGZ1-1441424704751-4f9d6f1d selected partitions : testTopic:0:
fetched offset = -1: consumed offset = -1
[-4f9d6f1d-leader-finder-thread] cherManager$LeaderFinderThread INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d-leader-finder-thread], Starting 
[pache.camel.spring.Main.main()] ZookeeperConsumerConnector     INFO 
[group1_FWLYGZ1-1441424704751-4f9d6f1d], end rebalancing consumer
group1_FWLYGZ1-1441424704751-4f9d6f1d try #0
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Route:
route1 started and consuming from:
Endpoint[kafka://10.0.0.2:9092?groupId=group1&topic=testTopic&zookeeperHost=10.0.0.2&zookeeperPort=2181]
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Total
1 routes, of which 1 is started.
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Apache
Camel 2.15.1.redhat-620133 (CamelContext: camel-1) started in 8.302 seconds
[-4f9d6f1d-leader-finder-thread] VerifiableProperties           INFO 
Verifying properties
[-4f9d6f1d-leader-finder-thread] VerifiableProperties           INFO 
Property client.id is overridden to group1
[-4f9d6f1d-leader-finder-thread] VerifiableProperties           INFO 
Property metadata.broker.list is overridden to FRENCHI-PC:9092
[-4f9d6f1d-leader-finder-thread] VerifiableProperties           INFO 
Property request.timeout.ms is overridden to 30000
[-4f9d6f1d-leader-finder-thread] ClientUtils$                   INFO 
Fetching metadata from broker id:0,host:FRENCHI-PC,port:9092 with
correlation id 0 for 1 topic(s) Set(testTopic)
[-4f9d6f1d-leader-finder-thread] SyncProducer                   INFO 
Connected to FRENCHI-PC:9092 for producing
[-4f9d6f1d-leader-finder-thread] SyncProducer                   INFO 
Disconnecting from FRENCHI-PC:9092
[-4f9d6f1d-leader-finder-thread] ClientUtils$                   WARN 
Fetching topic metadata with correlation id 0 for topics [Set(testTopic)]
from broker [id:0,host:FRENCHI-PC,port:9092] failed
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
        at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[-4f9d6f1d-leader-finder-thread] SyncProducer                   INFO 
Disconnecting from FRENCHI-PC:9092
[-4f9d6f1d-leader-finder-thread] cherManager$LeaderFinderThread WARN 
[group1_FWLYGZ1-1441424704751-4f9d6f1d-leader-finder-thread], Failed to find
leader for Set([testTopic,0])
kafka.common.KafkaException: fetching topic metadata for topics
[Set(testTopic)] from broker [ArrayBuffer(id:0,host:FRENCHI-PC,port:9092)]
failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
        at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        ... 3 more
[-4f9d6f1d-leader-finder-thread] ConsumerFetcherManager         INFO 
[ConsumerFetcherManager-1441424706113] Added fetcher for partitions
ArrayBuffer()
[-4f9d6f1d-leader-finder-thread] VerifiableProperties           INFO 
Verifying properties
[-4f9d6f1d-leader-finder-thread] VerifiableProperties           INFO 
Property client.id is overridden to group1
[-4f9d6f1d-leader-finder-thread] VerifiableProperties           INFO 
Property metadata.broker.list is overridden to FRENCHI-PC:9092
[-4f9d6f1d-leader-finder-thread] VerifiableProperties           INFO 
Property request.timeout.ms is overridden to 30000
[-4f9d6f1d-leader-finder-thread] ClientUtils$                   INFO 
Fetching metadata from broker id:0,host:FRENCHI-PC,port:9092 with
correlation id 1 for 1 topic(s) Set(testTopic)
[-4f9d6f1d-leader-finder-thread] SyncProducer                   INFO 
Connected to FRENCHI-PC:9092 for producing
[-4f9d6f1d-leader-finder-thread] SyncProducer                   INFO 
Disconnecting from FRENCHI-PC:9092
[-4f9d6f1d-leader-finder-thread] ClientUtils$                   WARN 
Fetching topic metadata with correlation id 1 for topics [Set(testTopic)]
from broker [id:0,host:FRENCHI-PC,port:9092] failed
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
        at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[-4f9d6f1d-leader-finder-thread] SyncProducer                   INFO 
Disconnecting from FRENCHI-PC:9092
[-4f9d6f1d-leader-finder-thread] cherManager$LeaderFinderThread WARN 
[group1_FWLYGZ1-1441424704751-4f9d6f1d-leader-finder-thread], Failed to find
leader for Set([testTopic,0])
kafka.common.KafkaException: fetching topic metadata for topics
[Set(testTopic)] from broker [ArrayBuffer(id:0,host:FRENCHI-PC,port:9092)]
failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
        at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
        ... 3 more
[-4f9d6f1d-leader-finder-thread] ConsumerFetcherManager         INFO 
[ConsumerFetcherManager-1441424706113] Added fetcher for partitions
ArrayBuffer()
*



-----
Reji Mathews
Sr. Developer - Middleware Integration / SOA ( Open Source - Apache Camel & 
Jboss Fuse ESB | Mule ESB )
LinkedIn - http://in.linkedin.com/pub/reji-mathews/31/9a2/40a
Twitter - reji_mathews
--
View this message in context: 
http://camel.465427.n5.nabble.com/Camel-KAFKA-consumer-tp5771265.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to