I think I found out what happened .. I was installing Kafka under DC/OS on
AWS and following this doc https://dcos.io/docs/1.8/usage/tutorials/kafka/ for
kafka installation. This works fine and installs kafka version 0.10.0.

But in the example where it shows how to produce and consume messages, this
doc says the following ..

core@ip-10-0-6-153 ~ $ docker run -it mesosphere/kafka-client
> root@7d0aed75e582:/bin# echo "Hello, World." |
> ./kafka-console-producer.sh --broker-list KAFKA_ADDRESS:PORT --topic topic1


The problem is the docker run of kafka-client pulls in version 0.9.0. When
I use kafka-console-producer from this client, it produces messages based
on 0.9.0 which obviously cannot be consumed by 0.10.0 brokers.

Thanks for pointing in this direction .. I think it will be fixed if I
install a 0.10.0 client in its place.

regards.

On Sunday 30 October 2016, Matthias J. Sax <matth...@confluent.io> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> The simplest way should be to check the java classpath.
>
> Insert an
>
> echo $CLASSPATH
>
> at the end of bin/kafka-run-class.sh
>
> Than run bin/kafka-console-producer.sh with no argument.
>
> You should see the classpath be printed out. Look for
> 'kafka-clients-XXX.jar' -- XXX will be the version number.
>
>
> - -Matthias
>
>
> On 10/29/16 12:11 AM, Debasish Ghosh wrote:
> > Hello Mathias -
> >
> > Regarding ..
> >
> > In case you do have 0.10 brokers, it might however happen, that
> > bin/kafka-console-producer.sh
> >> does use 0.9 producer.
> >
> >
> > How can I check this ?
> >
> > Thanks!
> >
> > On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh
> > <ghosh.debas...@gmail.com <javascript:;>> wrote:
> >
> >> I agree .. the problem is DC/OS still ships the older version.
> >> Let me check if I can upgrade this ..
> >>
> >> Thanks!
> >>
> >> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax
> >> <matth...@confluent.io <javascript:;>> wrote:
> >>
> > Btw: I would highly recommend to use Kafka 0.10.1 -- there are
> > many new Streams feature and usability improvements and bug fixes.
> >
> > -Matthias
> >
> > On 10/28/16 11:42 PM, Matthias J. Sax wrote:
> >>>>> That sounds reasonable. However, I am wondering how your
> >>>>> Streams application can connect to 0.9 broker in the first
> >>>>> place. Streams internally uses standard Kafka clients, and
> >>>>> those are not backward compatible. Thus, the 0.10 Streams
> >>>>> clients should not be able to connect to 0.9 broker.
> >>>>>
> >>>>> In case you do have 0.10 brokers, it might however happen,
> >>>>> that bin/kafka-console-producer.sh does use 0.9 producer.
> >>>>> Broker are backward compatible, thus, a 0.9 producer can
> >>>>> write to 0.10 broker (and in this case record TS would be
> >>>>> invalid). While I assume that in you local environment you
> >>>>> are using 0.10 bin/kafka-console-produer.sh and thus all
> >>>>> works fine.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> >>>>>> Hello Mathias -
> >>>>>
> >>>>>> Thanks a lot for the response. I think what may be
> >>>>>> happening is a version mismatch between the development &
> >>>>>> deployment versions of Kafka. The Kafka streams
> >>>>>> application that I developed uses 0.10.0 based libraries.
> >>>>>> And my local environment contains a server installation
> >>>>>> of the same version. Hence it works ok in my local
> >>>>>> environment.
> >>>>>
> >>>>>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install
> >>>>>> the service through DC/OS cli. And I use this version to
> >>>>>> load records into the input topic. And try to consume
> >>>>>> using the deployed streams application which I developed
> >>>>>> using 0.10.0. Hence the producer did not put the
> >>>>>> timestamp while the consumer expects to have one.
> >>>>>
> >>>>>> I need to check if 0.10.x is available for DC/OS ..
> >>>>>
> >>>>>> Thanks again for your suggestions.
> >>>>>
> >>>>>
> >>>>>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> >>>>>> <matth...@confluent.io <javascript:;>> wrote:
> >>>>>
> >>>>>> Hey,
> >>>>>
> >>>>>> we just added a new FAQ entry for upcoming CP 3.2 release
> >>>>>> that answers your question. I just c&p it here. More
> >>>>>> concrete answer below.
> >>>>>
> >>>>>>>>> If you get an exception similar to the one shown
> >>>>>>>>> below, there are multiple possible causes:
> >>>>>>>>>
> >>>>>>>>> Exception in thread "StreamThread-1"
> >>>>>>>>> java.lang.IllegalArgumentException: Invalid
> >>>>>>>>> timestamp -1 at
> >>>>>>>>>
> >>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(Produc
> erRe
> >
> >>>>>>>>>
> c
> >>>>>
> >>>>>>>>>
> > ord
> >>>>>
> >>>>>>>>>
> >>>>> .java:60)
> >>>>>>>>>
> >>>>>>>>> This error means that the timestamp extractor of
> >>>>>>>>> your Kafka Streams application failed to extract a
> >>>>>>>>> valid timestamp from a record. Typically, this
> >>>>>>>>> points to a problem with the record (e.g., the
> >>>>>>>>> record does not contain a timestamp at all), but it
> >>>>>>>>> could also indicate a problem or bug in the
> >>>>>>>>> timestamp extractor used by the application.
> >>>>>>>>>
> >>>>>>>>> When does a record not contain a valid timestamp:
> >>>>>>>>>
> >>>>>>>>> If you are using the default
> >>>>>>>>> ConsumerRecordTimestampExtractor, it is most likely
> >>>>>>>>> that your records do not carry an embedded
> >>>>>>>>> timestamp (embedded record timestamps got
> >>>>>>>>> introduced in Kafka's message format in Kafka
> >>>>>>>>> 0.10). This might happen, if you consume a topic
> >>>>>>>>> that is written by old Kafka producer clients (ie,
> >>>>>>>>> version 0.9 or earlier) or third party producer
> >>>>>>>>> clients. A common situation where this may happen
> >>>>>>>>> is after upgrading your Kafka cluster from 0.9 to
> >>>>>>>>> 0.10, where all the data that was generated with
> >>>>>>>>> 0.9 is not compatible with the 0.10 message format.
> >>>>>>>>> If you are using a custom timestamp extractor,
> >>>>>>>>> make sure that your extractor is robust to missing
> >>>>>>>>> timestamps in your records. For example, you can
> >>>>>>>>> return a default or estimated timestamp if you
> >>>>>>>>> cannot extract a valid timestamp (maybe the
> >>>>>>>>> timstamp field in your data is just missing). You
> >>>>>>>>> can also switch to processing time semantics via
> >>>>>>>>> WallclockTimestampExtractor; whether such a
> >>>>>>>>> fallback is an appropriate response to this
> >>>>>>>>> situation depends on your use case. However, as a
> >>>>>>>>> first step you should identify and fix the root
> >>>>>>>>> cause for why such problematic records were written
> >>>>>>>>> to Kafka in the first place. In a second step you
> >>>>>>>>> may consider applying workarounds (as described
> >>>>>>>>> above) when dealing with such records (for example,
> >>>>>>>>> if you need to process those records after all).
> >>>>>>>>> Another option is to regenerate the records with
> >>>>>>>>> correct timestamps and write them to a new Kafka
> >>>>>>>>> topic.
> >>>>>>>>>
> >>>>>>>>> When the timestamp extractor causes the problem:
> >>>>>>>>>
> >>>>>>>>> In this situation you should debug and fix the
> >>>>>>>>> erroneous extractor. If the extractor is built into
> >>>>>>>>> Kafka, please report the bug to the Kafka developer
> >>>>>>>>> mailing list at d...@kafka.apache.org <javascript:;> (see
> >>>>>>>>> instructions http://kafka.apache.org/contact); in
> >>>>>>>>> the meantime, you may write a custom timestamp
> >>>>>>>>> extractor that fixes the problem and configure your
> >>>>>>>>> application to use that extractor for the time
> >>>>>>>>> being.
> >>>>>
> >>>>>
> >>>>>> To address you questions more concretely:
> >>>>>
> >>>>>> 1. Yes an no: Yes, for any new data you write to you
> >>>>>> topic. No, for any already written data that does not
> >>>>>> have a valid timestamp set 2. Default is creating time 3.
> >>>>>> Config parameter "message.timestamp.type") It's a broker
> >>>>>> side per topic setting (however, be aware that Java
> >>>>>> KafkaProducer does verify the timestamp locally before
> >>>>>> sending the message to the broker, thus on -1 there will
> >>>>>> be the client side exception you did observe( 4. I assume
> >>>>>> that you do consumer different topic with different TS
> >>>>>> fields in you records.
> >>>>>
> >>>>>> Also have a look at:
> >>>>>> http://docs.confluent.io/current/streams/concepts.html#time
> >>>>>
> >>>>>
> >>>>>
> >>>>>>
> >>>>>>
> - -Matthias
> >>>>>
> >>>>>
> >>>>>> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> >>>>>>>>> I am actually using 0.10.0 and NOT 0.10.1 as I
> >>>>>>>>> mentioned in the last mail. And I am using Kafka
> >>>>>>>>> within a DC/OS cluster under AWS.
> >>>>>>>>>
> >>>>>>>>> The version that I mentioned works ok is on my
> >>>>>>>>> local machine using a local Kafka installation. And
> >>>>>>>>> it works for both single broker and multi broker
> >>>>>>>>> scenario.
> >>>>>>>>>
> >>>>>>>>> Thanks.
> >>>>>>>>>
> >>>>>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> >>>>>>>>> <ghosh.debas...@gmail.com <javascript:;>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello -
> >>>>>>>>>>
> >>>>>>>>>> I am a beginner in Kafka .. with my first Kafka
> >>>>>>>>>> streams application ..
> >>>>>>>>>>
> >>>>>>>>>> I have a streams application that reads from a
> >>>>>>>>>> topic, does some transformation on the data and
> >>>>>>>>>> writes to another topic. The record that I
> >>>>>>>>>> manipulate is a CSV record.
> >>>>>>>>>>
> >>>>>>>>>> It runs fine when I run it on a local Kafka
> >>>>>>>>>> instance.
> >>>>>>>>>>
> >>>>>>>>>> However when I run it on an AWS cluster, I get
> >>>>>>>>>> the following exception when I try to produce
> >>>>>>>>>> the transformed record into the target topic.
> >>>>>>>>>>
> >>>>>>>>>> Exception in thread "StreamThread-1"
> >>>>>>>>>> java.lang.IllegalArgumentException: Invalid
> >>>>>>>>>> timestamp -1 at
> >>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
> >>>>>>>>>>
> >>>>>>>>>>
> (ProducerRecord.java:60) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>>>>>>>
> >>>>>>>>>>
> process(SinkNode.java:72) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at
> >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
> oces
> >
> >>>>>>>>>>
> s
> >>>>>
> >>>>>>>>>>
> > (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>> ProcessorNode.java:68)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at
> >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough
> $
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >>>>>>>>>> at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
> oces
> >
> >>>>>>>>>>
> s
> >>>>>
> >>>>>>>>>>
> > (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>> ProcessorNode.java:68)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:351) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at
> >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >>>>>>>>>>
> >>>>>>>>>>
> KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >>>>>>>>>>
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
> oces
> >
> >>>>>>>>>>
> s
> >>>>>
> >>>>>>>>>>
> > (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>> ProcessorNode.java:68)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at
> >>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
> oces
> >
> >>>>>>>>>>
> s
> >>>>>
> >>>>>>>>>>
> > (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>> ProcessorNode.java:68)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>>>>>
> >>>>>>>>>>
> >
> >>>>>>>>>>
> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> SourceNode.process(SourceNode.java:64) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamTask.process(StreamTask.java:174) at
> >>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.run
> Loop
> >
> >>>>>>>>>>
> (
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>> StreamThread.java:320)
> >>>>>>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> StreamThread.run(StreamThread.java:218)
> >>>>>>>>>>
> >>>>>>>>>> Looks like the timestamp passed to the
> >>>>>>>>>> ProducerRecord is -1, though I am not passing any
> >>>>>>>>>> timestamp explicitly. I am not sure why this
> >>>>>>>>>> happens. But I see that the Javadoc for
> >>>>>>>>>> ProducerRecord says the following ..
> >>>>>>>>>>
> >>>>>>>>>> The record also has an associated timestamp. If
> >>>>>>>>>> the user did not provide a
> >>>>>>>>>>> timestamp, the producer will stamp the record
> >>>>>>>>>>> with its current time. The timestamp eventually
> >>>>>>>>>>> used by Kafka depends on the timestamp type
> >>>>>>>>>>> configured for the topic. If the topic is
> >>>>>>>>>>> configured to use CreateTime, the timestamp in
> >>>>>>>>>>> the producer record will be used by the broker.
> >>>>>>>>>>> If the topic is configured to use
> >>>>>>>>>>> LogAppendTime, the timestamp in the producer
> >>>>>>>>>>> record will be overwritten by the broker with
> >>>>>>>>>>> the broker local time when it appends the
> >>>>>>>>>>> message to its log. In either of the cases
> >>>>>>>>>>> above, the timestamp that has actually been
> >>>>>>>>>>> used will be returned to user in
> >>>>>>>>>>> RecordMetadata
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 1. Will this problem be solved if I configure the
> >>>>>>>>>> topic with LogAppendTime or CreateTime explicitly
> >>>>>>>>>> ? 2. What is the default setting of this property
> >>>>>>>>>> in a newly created topic ? 3. How do I change it
> >>>>>>>>>> (what is the name of the property to be set) ? 4.
> >>>>>>>>>> Any idea why I face this problem in the cluster
> >>>>>>>>>> mode but not in the local mode ?
> >>>>>>>>>>
> >>>>>>>>>> BTW I am using 0.10.1.
> >>>>>>>>>>
> >>>>>>>>>> Any help / pointer will be appreciated ?
> >>>>>>>>>>
> >>>>>>>>>> regards.
> >>>>>>>>>>
> >>>>>>>>>> -- Debasish Ghosh http://manning.com/ghosh2
> >>>>>>>>>> http://manning.com/ghosh
> >>>>>>>>>>
> >>>>>>>>>> Twttr: @debasishg Blog:
> >>>>>>>>>> http://debasishg.blogspot.com Code:
> >>>>>>>>>> http://github.com/debasishg
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >>
> >>
> >> -- Debasish Ghosh http://manning.com/ghosh2
> >> http://manning.com/ghosh
> >>
> >> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code:
> >> http://github.com/debasishg
> >>
> >
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYFaONAAoJECnhiMLycopPH/4P+gOUVW4Ab6UBlrRiap1snDkC
> 1xNaWIzlyF2i8nZ+FUTmXrvJiF1TSSw8f+apyZpjf+Q8uMS1Bv6ZzqqiHVC0+gFb
> Ymis+pOHhEl3je5uJWf41emrUxvJHalDlrLqLCk0cxlTYgBCgoAxLtzbvFrejw0e
> uYcfjz+mERK4upNZS3KbO8tMMpr+M163u02dUAhT6kuaJNfSICNKKEVIK9WrCAMN
> skhrVhcM6XRh6YisU73erg2grcGAMTxYf53eWt8saRqICMlcmxDbTHSxJ6Qog4l4
> c6OSxxtnB+rXYSDEtoWH3CSxkPK0zlLUDcKXqz4bUNrWgoaCjAfz2WuANNskUi3L
> dZlfO+vvbS2NLjqNFzqVUjV5tnbdCL7MTO4ByQZ7Jh9TQeOyMkHW8+fGsCZUJ3Ex
> SIx95MYJyOk1n390yFjjeJEQT/yHIq7nXXgxjJ6dBPfEIB6VwXOHGvaucXMus2XF
> ioBr/CuoNhWTfHn19TNSkSObJP61W2YCA1xHlSzVutHoISKHZKRq0z6AbbLdD7Z3
> weLV5zHrnmibeKDYc+OX+60Kr+YsgSqeNwDx+EVFjkMTagUydzg06PcWigSLl6ZC
> 2rxs2Esb0W+Q4Iy+IYCzfQuQGQz5oSiUB32hxTio6dDz8otBY6+U3QrBz7mOtc+v
> OeP4OeKsbMBBZGJNa0Ux
> =NjV2
> -----END PGP SIGNATURE-----
>


-- 
Sent from my iPhone

Reply via email to