Don't specify Kafka dependencies. Camel will transitively bring it. Otherwise you are causing version conflict. On Mon, 9 Jan 2017 at 14:20 Kamal C <kamaltar...@gmail.com> wrote:
> Can you enable DEBUG logs ? It'll be helpful to debug. > > -- Kamal > > On Mon, Jan 9, 2017 at 5:37 AM, Gupta, Swati <swati.gu...@anz.com> wrote: > > > Hello All, > > > > Any help on this would be appreciated. > > There seems to be no error. Does it look like a version issue? > > > > I have updated my pom.xml with the below: > > <dependency> > > <groupId>org.springframework.kafka</groupId> > > <artifactId>spring-kafka</artifactId> > > <version>1.1.2.BUILD-SNAPSHOT</version> > > </dependency> > > > > <dependency> > > <groupId>org.apache.camel</groupId> > > <artifactId>camel-kafka</artifactId> > > <version>2.17.0</version> > > </dependency> > > > > <dependency> > > <groupId>org.apache.kafka</groupId> > > <artifactId>kafka-clients</artifactId> > > <version>0.10.1.0</version> > > </dependency> > > <dependency> > > <groupId>org.apache.kafka</groupId> > > <artifactId>kafka_2.11</artifactId> > > <version>0.10.1.0</version> > > </dependency> > > > > <dependency> > > <groupId>org.apache.camel</groupId> > > <artifactId>camel-core</artifactId> > > <version>2.17.0</version> > > </dependency> > > > > Thanks & Regards > > Swati > > > > -----Original Message----- > > From: Gupta, Swati [mailto:swati.gu...@anz.com] > > Sent: Friday, 6 January 2017 4:01 PM > > To: users@kafka.apache.org > > Subject: RE: Apache Kafka integration using Apache Camel > > > > Yes, the kafka console consumer displays the message correctly. > > I also tested the same with a Java application, it works fine. There > seems > > to be an issue with Camel route trying to consume. > > > > There is no error in the console. But, the logs show as below: > > kafka.KafkaCamelTestConsumer > > Connected to the target VM, address: '127.0.0.1:65007', transport: > > 'socket' > > PID_IS_UNDEFINED: INFO DefaultCamelContext - Apache Camel 2.17.0 > > (CamelContext: camel-1) is starting > > PID_IS_UNDEFINED: INFO ManagedManagementStrategy - JMX is enabled > > PID_IS_UNDEFINED: INFO DefaultTypeConverter - Loaded 183 type converters > > PID_IS_UNDEFINED: INFO DefaultRuntimeEndpointRegistry - Runtime endpoint > > registry is in extended mode gathering usage statistics of all incoming > and > > outgoing endpoints (cache limit: 1000) > > PID_IS_UNDEFINED: INFO DefaultCamelContext - AllowUseOriginalMessage is > > enabled. If access to the original message is not needed, then its > > recommended to turn this option off as it may improve performance. > > PID_IS_UNDEFINED: INFO DefaultCamelContext - StreamCaching is not in > use. > > If using streams then its recommended to enable stream caching. See more > > details at http://camel.apache.org/stream-caching.html > > PID_IS_UNDEFINED: INFO KafkaConsumer - Starting Kafka consumer > > PID_IS_UNDEFINED: INFO ConsumerConfig - ConsumerConfig values: > > auto.commit.interval.ms = 5000 > > auto.offset.reset = earliest > > bootstrap.servers = [localhost:9092] > > check.crcs = true > > client.id = > > connections.max.idle.ms = 540000 > > enable.auto.commit = true > > exclude.internal.topics = true > > fetch.max.bytes = 52428800 > > fetch.max.wait.ms = 500 > > fetch.min.bytes = 1024 > > group.id = testing > > heartbeat.interval.ms = 3000 > > interceptor.classes = null > > key.deserializer = class org.apache.kafka.common.serialization. > > StringDeserializer > > max.partition.fetch.bytes = 1048576 > > max.poll.interval.ms = 300000 > > max.poll.records = 500 > > metadata.max.age.ms = 300000 > > metric.reporters = [] > > metrics.num.samples = 2 > > metrics.sample.window.ms = 30000 > > partition.assignment.strategy = [org.apache.kafka.clients. > > consumer.RangeAssignor] > > receive.buffer.bytes = 32768 > > reconnect.backoff.ms = 50 > > request.timeout.ms = 40000 > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.min.time.before.relogin = 60000 > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > sasl.mechanism = GSSAPI > > security.protocol = PLAINTEXT > > send.buffer.bytes = 131072 > > session.timeout.ms = 30000 > > ssl.cipher.suites = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.endpoint.identification.algorithm = null > > ssl.key.password = null > > ssl.keymanager.algorithm = SunX509 > > ssl.keystore.location = null > > ssl.keystore.password = null > > ssl.keystore.type = JKS > > ssl.protocol = TLS > > ssl.provider = null > > ssl.secure.random.implementation = null > > ssl.trustmanager.algorithm = PKIX > > ssl.truststore.location = null > > ssl.truststore.password = null > > ssl.truststore.type = JKS > > value.deserializer = class org.apache.kafka.common.serialization. > > StringDeserializer > > > > PID_IS_UNDEFINED: INFO ConsumerConfig - ConsumerConfig values: > > auto.commit.interval.ms = 5000 > > auto.offset.reset = earliest > > bootstrap.servers = [localhost:9092] > > check.crcs = true > > client.id = consumer-1 > > connections.max.idle.ms = 540000 > > enable.auto.commit = true > > exclude.internal.topics = true > > fetch.max.bytes = 52428800 > > fetch.max.wait.ms = 500 > > fetch.min.bytes = 1024 > > group.id = testing > > heartbeat.interval.ms = 3000 > > interceptor.classes = null > > key.deserializer = class org.apache.kafka.common.serialization. > > StringDeserializer > > max.partition.fetch.bytes = 1048576 > > max.poll.interval.ms = 300000 > > max.poll.records = 500 > > metadata.max.age.ms = 300000 > > metric.reporters = [] > > metrics.num.samples = 2 > > metrics.sample.window.ms = 30000 > > partition.assignment.strategy = [org.apache.kafka.clients. > > consumer.RangeAssignor] > > receive.buffer.bytes = 32768 > > reconnect.backoff.ms = 50 > > request.timeout.ms = 40000 > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.min.time.before.relogin = 60000 > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > sasl.mechanism = GSSAPI > > security.protocol = PLAINTEXT > > send.buffer.bytes = 131072 > > session.timeout.ms = 30000 > > ssl.cipher.suites = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.endpoint.identification.algorithm = null > > ssl.key.password = null > > ssl.keymanager.algorithm = SunX509 > > ssl.keystore.location = null > > ssl.keystore.password = null > > ssl.keystore.type = JKS > > ssl.protocol = TLS > > ssl.provider = null > > ssl.secure.random.implementation = null > > ssl.trustmanager.algorithm = PKIX > > ssl.truststore.location = null > > ssl.truststore.password = null > > ssl.truststore.type = JKS > > value.deserializer = class org.apache.kafka.common.serialization. > > StringDeserializer > > > > PID_IS_UNDEFINED: INFO AppInfoParser - Kafka version : 0.10.1.0 > > PID_IS_UNDEFINED: INFO AppInfoParser - Kafka commitId : 3402a74efb23d1d4 > > PID_IS_UNDEFINED: INFO DefaultCamelContext - Route: route1 started and > > consuming from: Endpoint[kafka://localhost:9092?autoOffsetReset=earliest& > > consumersCount=1&groupId=testing&topic=test] > > PID_IS_UNDEFINED: INFO DefaultCamelContext - Total 1 routes, of which 1 > > are started. > > PID_IS_UNDEFINED: INFO DefaultCamelContext > > > > -----Original Message----- > > From: Ewen Cheslack-Postava [mailto:e...@confluent.io] > > Sent: Friday, 6 January 2017 3:58 PM > > To: users@kafka.apache.org > > Subject: Re: Apache Kafka integration using Apache Camel > > > > More generally, do you have any log errors/messages or additional info? > > It's tough to debug issues like this from 3rd party libraries if they > > don't provide logs/exception info that indicates why processing a > specific > > message failed. > > > > -Ewen > > > > On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9...@gmail.com> > > wrote: > > > > > Did you test that kafka console consumer is displaying the produced > > > message? > > > > > > On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <swati.gu...@anz.com> > > wrote: > > > > > > > Hello All, > > > > > > > > > > > > > > > > I am trying to create a Consumer using Apache Camel for a topic in > > > > Apache Kafka. > > > > I am using Camel 2.17.0 and Kafka 0.10 and JDK 1.8. > > > > I have attached a file, KafkaCamelTestConsumer.java which is a > > > > standalone application trying to read from a topic “test1”created > > > > in Apache Kafka I am producing messages from the console and also > > > > was successful to produce messages using a Camel program in the > > > > topic "test1", but not able to consume messages. Ideally, it should > > > > get printed, but nothing seems to happen. The log says that the > > > > route has started but does not process any message. > > > > > > > > Please help to confirm if there is anything wrong with the below > > syntax: > > > > > > > > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew& > > > autoOffsetReset=earliest" > > > > *+ > > > > > > > > *"&consumersCount=1&keyDeserializer=org.apache. > > > kafka.common.serialization.StringDeserializer&" > > > > *+ > > > > *"valueDeserializer=org.apache.kafka.common.serialization. > > > StringDeserializer" > > > > *+ > > > > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000& > > > autoCommitEnable=true"* > > > > ).split() > > > > .body() > > > > .process(*new *Processor() { > > > > @Override > > > > *public void *process(Exchange exchange) > > > > *throws *Exception { > > > > String messageKey = *""*; > > > > *if *(exchange.getIn() != *null*) { > > > > Message message = exchange.getIn(); > > > > Integer partitionId = (Integer) message > > > > .getHeader(KafkaConstants.* > > > PARTITION* > > > > ); > > > > String topicName = (String) message > > > > .getHeader(KafkaConstants.* > > TOPIC*); > > > > *if *(message.getHeader( > > > KafkaConstants.*KEY*) > > > > != *null*) > > > > messageKey = (String) message > > > > .getHeader(KafkaConstants.* > > > KEY*); > > > > Object data = message.getBody(); > > > > > > > > > > > > System.*out*.println( > > > > *"topicName :: " *+ topicName + > > > > *" partitionId :: " *+ > partitionId > > + > > > > *" messageKey :: " *+ messageKey > + > > > > *" message :: " *+ data + > > > *"**\n**"*); > > > > } > > > > } > > > > }).to( > > > > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*); > > > > } > > > > }); > > > > > > > > > > > > > > > > I have also tried with the basic parameters as below and it still > > > > fails > > > to > > > > read messages. > > > > > > > > from( > > > > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew& > > > autoOffsetReset=earliest")* > > > > > > > > Any help on this will be greatly appreciated. > > > > > > > > Thanks in advance > > > > > > > > > > > > > > > > Thanks & Regards > > > > > > > > Swati > > > > > > > > ------------------------------ > > > > This e-mail and any attachments to it (the "Communication") is, > > > > unless otherwise stated, confidential, may contain copyright > > > > material and is for the use only of the intended recipient. If you > > > > receive the Communication > > > in > > > > error, please notify the sender immediately by return e-mail, delete > > > > the Communication and the return e-mail, and do not read, copy, > > > > retransmit or otherwise deal with it. Any views expressed in the > > > > Communication are > > > those > > > > of the individual sender only, unless expressly stated to be those > > > > of Australia and New Zealand Banking Group Limited ABN 11 005 357 > > > > 522, or > > > any > > > > of its related entities including ANZ Bank New Zealand Limited > > > > (together "ANZ"). ANZ does not accept liability in connection with > > > > the integrity of or errors in the Communication, computer virus, > > > > data corruption, interference or delay arising from or in respect of > > the Communication. > > > > > > > > > > > > > > > > > This e-mail and any attachments to it (the "Communication") is, unless > > otherwise stated, confidential, may contain copyright material and is for > > the use only of the intended recipient. If you receive the Communication > in > > error, please notify the sender immediately by return e-mail, delete the > > Communication and the return e-mail, and do not read, copy, retransmit or > > otherwise deal with it. Any views expressed in the Communication are > those > > of the individual sender only, unless expressly stated to be those of > > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or > any > > of its related entities including ANZ Bank New Zealand Limited (together > > "ANZ"). ANZ does not accept liability in connection with the integrity of > > or errors in the Communication, computer virus, data corruption, > > interference or delay arising from or in respect of the Communication. > > > > > > This e-mail and any attachments to it (the "Communication") is, unless > > otherwise stated, confidential, may contain copyright material and is for > > the use only of the intended recipient. If you receive the Communication > in > > error, please notify the sender immediately by return e-mail, delete the > > Communication and the return e-mail, and do not read, copy, retransmit or > > otherwise deal with it. Any views expressed in the Communication are > those > > of the individual sender only, unless expressly stated to be those of > > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or > any > > of its related entities including ANZ Bank New Zealand Limited (together > > "ANZ"). ANZ does not accept liability in connection with the integrity of > > or errors in the Communication, computer virus, data corruption, > > interference or delay arising from or in respect of the Communication. > > >