Yes, the kafka console consumer displays the message correctly.
I also tested the same with a Java application, it works fine. There seems to 
be an issue with Camel route trying to consume.

There is no error in the console. But, the logs show as below:
kafka.KafkaCamelTestConsumer
Connected to the target VM, address: '127.0.0.1:65007', transport: 'socket'
PID_IS_UNDEFINED: INFO  DefaultCamelContext - Apache Camel 2.17.0 
(CamelContext: camel-1) is starting
PID_IS_UNDEFINED: INFO  ManagedManagementStrategy - JMX is enabled
PID_IS_UNDEFINED: INFO  DefaultTypeConverter - Loaded 183 type converters
PID_IS_UNDEFINED: INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint 
registry is in extended mode gathering usage statistics of all incoming and 
outgoing endpoints (cache limit: 1000)
PID_IS_UNDEFINED: INFO  DefaultCamelContext - AllowUseOriginalMessage is 
enabled. If access to the original message is not needed, then its recommended 
to turn this option off as it may improve performance.
PID_IS_UNDEFINED: INFO  DefaultCamelContext - StreamCaching is not in use. If 
using streams then its recommended to enable stream caching. See more details 
at http://camel.apache.org/stream-caching.html
PID_IS_UNDEFINED: INFO  KafkaConsumer - Starting Kafka consumer
PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1024
        group.id = testing
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 50
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 30000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = consumer-1
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1024
        group.id = testing
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 50
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 30000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

PID_IS_UNDEFINED: INFO  AppInfoParser - Kafka version : 0.10.1.0
PID_IS_UNDEFINED: INFO  AppInfoParser - Kafka commitId : 3402a74efb23d1d4
PID_IS_UNDEFINED: INFO  DefaultCamelContext - Route: route1 started and 
consuming from: 
Endpoint[kafka://localhost:9092?autoOffsetReset=earliest&consumersCount=1&groupId=testing&topic=test]
PID_IS_UNDEFINED: INFO  DefaultCamelContext - Total 1 routes, of which 1 are 
started.
PID_IS_UNDEFINED: INFO  DefaultCamelContext

-----Original Message-----
From: Ewen Cheslack-Postava [mailto:e...@confluent.io] 
Sent: Friday, 6 January 2017 3:58 PM
To: users@kafka.apache.org
Subject: Re: Apache Kafka integration using Apache Camel

More generally, do you have any log errors/messages or additional info?
It's tough to debug issues like this from 3rd party libraries if they don't 
provide logs/exception info that indicates why processing a specific message 
failed.

-Ewen

On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9...@gmail.com> wrote:

> Did you test that kafka console consumer is displaying the produced 
> message?
>
> On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <swati.gu...@anz.com> wrote:
>
> > Hello All,
> >
> >
> >
> > I am trying to create a Consumer using Apache Camel for a topic in 
> > Apache Kafka.
> > I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
> > I have attached a file, KafkaCamelTestConsumer.java which is a 
> > standalone application trying to read from a topic  “test1”created 
> > in Apache Kafka I am producing messages from the console and also 
> > was successful to produce messages using a Camel program in the 
> > topic "test1", but not able to consume messages. Ideally, it should 
> > get printed, but nothing seems to happen. The log says that the 
> > route has started but does not process any message.
> >
> > Please help to confirm if there is anything wrong with the below syntax:
> >
> > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> autoOffsetReset=earliest"
> > *+
> >
> > *"&consumersCount=1&keyDeserializer=org.apache.
> kafka.common.serialization.StringDeserializer&"
> >                 *+
> > *"valueDeserializer=org.apache.kafka.common.serialization.
> StringDeserializer"
> >                 *+
> > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&
> autoCommitEnable=true"*
> > ).split()
> >                 .body()
> >                 .process(*new *Processor() {
> >                     @Override
> >                     *public void *process(Exchange exchange)
> >                             *throws *Exception {
> >                         String messageKey = *""*;
> >                         *if *(exchange.getIn() != *null*) {
> >                             Message message = exchange.getIn();
> >                             Integer partitionId = (Integer) message
> >                                     .getHeader(KafkaConstants.*
> PARTITION*
> > );
> >                             String topicName = (String) message
> >                                     .getHeader(KafkaConstants.*TOPIC*);
> >                             *if *(message.getHeader(
> KafkaConstants.*KEY*)
> > != *null*)
> >                                 messageKey = (String) message
> >                                         .getHeader(KafkaConstants.*
> KEY*);
> >                             Object data = message.getBody();
> >
> >
> >                             System.*out*.println(
> > *"topicName :: "                                     *+ topicName +
> > *" partitionId :: "                                     *+ partitionId +
> > *" messageKey :: "                                     *+ messageKey +
> > *" message :: "                                     *+ data +
> *"**\n**"*);
> >                         }
> >                     }
> >                 }).to(
> > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*);
> >     }
> > });
> >
> >
> >
> > I have also tried with the basic parameters as below and it still 
> > fails
> to
> > read messages.
> >
> > from(
> > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> autoOffsetReset=earliest")*
> >
> > Any help on this will be greatly appreciated.
> >
> > Thanks in advance
> >
> >
> >
> > Thanks & Regards
> >
> > Swati
> >
> > ------------------------------
> > This e-mail and any attachments to it (the "Communication") is, 
> > unless otherwise stated, confidential, may contain copyright 
> > material and is for the use only of the intended recipient. If you 
> > receive the Communication
> in
> > error, please notify the sender immediately by return e-mail, delete 
> > the Communication and the return e-mail, and do not read, copy, 
> > retransmit or otherwise deal with it. Any views expressed in the 
> > Communication are
> those
> > of the individual sender only, unless expressly stated to be those 
> > of Australia and New Zealand Banking Group Limited ABN 11 005 357 
> > 522, or
> any
> > of its related entities including ANZ Bank New Zealand Limited 
> > (together "ANZ"). ANZ does not accept liability in connection with 
> > the integrity of or errors in the Communication, computer virus, 
> > data corruption, interference or delay arising from or in respect of the 
> > Communication.
> >
> >
>


This e-mail and any attachments to it (the "Communication") is, unless 
otherwise stated, confidential, may contain copyright material and is for the 
use only of the intended recipient. If you receive the Communication in error, 
please notify the sender immediately by return e-mail, delete the Communication 
and the return e-mail, and do not read, copy, retransmit or otherwise deal with 
it. Any views expressed in the Communication are those of the individual sender 
only, unless expressly stated to be those of Australia and New Zealand Banking 
Group Limited ABN 11 005 357 522, or any of its related entities including ANZ 
Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in 
connection with the integrity of or errors in the Communication, computer 
virus, data corruption, interference or delay arising from or in respect of the 
Communication.

Reply via email to