-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

The simplest way should be to check the java classpath.

Insert an

echo $CLASSPATH

at the end of bin/kafka-run-class.sh

Than run bin/kafka-console-producer.sh with no argument.

You should see the classpath be printed out. Look for
'kafka-clients-XXX.jar' -- XXX will be the version number.


- -Matthias


On 10/29/16 12:11 AM, Debasish Ghosh wrote:
> Hello Mathias -
> 
> Regarding ..
> 
> In case you do have 0.10 brokers, it might however happen, that 
> bin/kafka-console-producer.sh
>> does use 0.9 producer.
> 
> 
> How can I check this ?
> 
> Thanks!
> 
> On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh
> <ghosh.debas...@gmail.com> wrote:
> 
>> I agree .. the problem is DC/OS still ships the older version.
>> Let me check if I can upgrade this ..
>> 
>> Thanks!
>> 
>> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax
>> <matth...@confluent.io> wrote:
>> 
> Btw: I would highly recommend to use Kafka 0.10.1 -- there are
> many new Streams feature and usability improvements and bug fixes.
> 
> -Matthias
> 
> On 10/28/16 11:42 PM, Matthias J. Sax wrote:
>>>>> That sounds reasonable. However, I am wondering how your
>>>>> Streams application can connect to 0.9 broker in the first
>>>>> place. Streams internally uses standard Kafka clients, and
>>>>> those are not backward compatible. Thus, the 0.10 Streams
>>>>> clients should not be able to connect to 0.9 broker.
>>>>> 
>>>>> In case you do have 0.10 brokers, it might however happen,
>>>>> that bin/kafka-console-producer.sh does use 0.9 producer.
>>>>> Broker are backward compatible, thus, a 0.9 producer can
>>>>> write to 0.10 broker (and in this case record TS would be
>>>>> invalid). While I assume that in you local environment you
>>>>> are using 0.10 bin/kafka-console-produer.sh and thus all
>>>>> works fine.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 10/28/16 11:00 PM, Debasish Ghosh wrote:
>>>>>> Hello Mathias -
>>>>> 
>>>>>> Thanks a lot for the response. I think what may be
>>>>>> happening is a version mismatch between the development &
>>>>>> deployment versions of Kafka. The Kafka streams
>>>>>> application that I developed uses 0.10.0 based libraries.
>>>>>> And my local environment contains a server installation
>>>>>> of the same version. Hence it works ok in my local
>>>>>> environment.
>>>>> 
>>>>>> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install
>>>>>> the service through DC/OS cli. And I use this version to
>>>>>> load records into the input topic. And try to consume
>>>>>> using the deployed streams application which I developed
>>>>>> using 0.10.0. Hence the producer did not put the
>>>>>> timestamp while the consumer expects to have one.
>>>>> 
>>>>>> I need to check if 0.10.x is available for DC/OS ..
>>>>> 
>>>>>> Thanks again for your suggestions.
>>>>> 
>>>>> 
>>>>>> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax 
>>>>>> <matth...@confluent.io> wrote:
>>>>> 
>>>>>> Hey,
>>>>> 
>>>>>> we just added a new FAQ entry for upcoming CP 3.2 release
>>>>>> that answers your question. I just c&p it here. More
>>>>>> concrete answer below.
>>>>> 
>>>>>>>>> If you get an exception similar to the one shown
>>>>>>>>> below, there are multiple possible causes:
>>>>>>>>> 
>>>>>>>>> Exception in thread "StreamThread-1" 
>>>>>>>>> java.lang.IllegalArgumentException: Invalid
>>>>>>>>> timestamp -1 at
>>>>>>>>> 
>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(Produc
erRe
>
>>>>>>>>> 
c
>>>>> 
>>>>>>>>> 
> ord
>>>>> 
>>>>>>>>> 
>>>>> .java:60)
>>>>>>>>> 
>>>>>>>>> This error means that the timestamp extractor of
>>>>>>>>> your Kafka Streams application failed to extract a
>>>>>>>>> valid timestamp from a record. Typically, this
>>>>>>>>> points to a problem with the record (e.g., the
>>>>>>>>> record does not contain a timestamp at all), but it
>>>>>>>>> could also indicate a problem or bug in the 
>>>>>>>>> timestamp extractor used by the application.
>>>>>>>>> 
>>>>>>>>> When does a record not contain a valid timestamp:
>>>>>>>>> 
>>>>>>>>> If you are using the default 
>>>>>>>>> ConsumerRecordTimestampExtractor, it is most likely
>>>>>>>>> that your records do not carry an embedded
>>>>>>>>> timestamp (embedded record timestamps got
>>>>>>>>> introduced in Kafka's message format in Kafka
>>>>>>>>> 0.10). This might happen, if you consume a topic 
>>>>>>>>> that is written by old Kafka producer clients (ie,
>>>>>>>>> version 0.9 or earlier) or third party producer
>>>>>>>>> clients. A common situation where this may happen
>>>>>>>>> is after upgrading your Kafka cluster from 0.9 to
>>>>>>>>> 0.10, where all the data that was generated with
>>>>>>>>> 0.9 is not compatible with the 0.10 message format.
>>>>>>>>> If you are using a custom timestamp extractor,
>>>>>>>>> make sure that your extractor is robust to missing
>>>>>>>>> timestamps in your records. For example, you can
>>>>>>>>> return a default or estimated timestamp if you
>>>>>>>>> cannot extract a valid timestamp (maybe the
>>>>>>>>> timstamp field in your data is just missing). You
>>>>>>>>> can also switch to processing time semantics via 
>>>>>>>>> WallclockTimestampExtractor; whether such a
>>>>>>>>> fallback is an appropriate response to this
>>>>>>>>> situation depends on your use case. However, as a
>>>>>>>>> first step you should identify and fix the root
>>>>>>>>> cause for why such problematic records were written
>>>>>>>>> to Kafka in the first place. In a second step you 
>>>>>>>>> may consider applying workarounds (as described
>>>>>>>>> above) when dealing with such records (for example,
>>>>>>>>> if you need to process those records after all).
>>>>>>>>> Another option is to regenerate the records with
>>>>>>>>> correct timestamps and write them to a new Kafka
>>>>>>>>> topic.
>>>>>>>>> 
>>>>>>>>> When the timestamp extractor causes the problem:
>>>>>>>>> 
>>>>>>>>> In this situation you should debug and fix the
>>>>>>>>> erroneous extractor. If the extractor is built into
>>>>>>>>> Kafka, please report the bug to the Kafka developer
>>>>>>>>> mailing list at d...@kafka.apache.org (see
>>>>>>>>> instructions http://kafka.apache.org/contact); in
>>>>>>>>> the meantime, you may write a custom timestamp
>>>>>>>>> extractor that fixes the problem and configure your
>>>>>>>>> application to use that extractor for the time
>>>>>>>>> being.
>>>>> 
>>>>> 
>>>>>> To address you questions more concretely:
>>>>> 
>>>>>> 1. Yes an no: Yes, for any new data you write to you
>>>>>> topic. No, for any already written data that does not
>>>>>> have a valid timestamp set 2. Default is creating time 3.
>>>>>> Config parameter "message.timestamp.type") It's a broker
>>>>>> side per topic setting (however, be aware that Java
>>>>>> KafkaProducer does verify the timestamp locally before
>>>>>> sending the message to the broker, thus on -1 there will
>>>>>> be the client side exception you did observe( 4. I assume
>>>>>> that you do consumer different topic with different TS 
>>>>>> fields in you records.
>>>>> 
>>>>>> Also have a look at: 
>>>>>> http://docs.confluent.io/current/streams/concepts.html#time
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> 
- -Matthias
>>>>> 
>>>>> 
>>>>>> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>>>>>>>>> I am actually using 0.10.0 and NOT 0.10.1 as I
>>>>>>>>> mentioned in the last mail. And I am using Kafka
>>>>>>>>> within a DC/OS cluster under AWS.
>>>>>>>>> 
>>>>>>>>> The version that I mentioned works ok is on my
>>>>>>>>> local machine using a local Kafka installation. And
>>>>>>>>> it works for both single broker and multi broker
>>>>>>>>> scenario.
>>>>>>>>> 
>>>>>>>>> Thanks.
>>>>>>>>> 
>>>>>>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh 
>>>>>>>>> <ghosh.debas...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> Hello -
>>>>>>>>>> 
>>>>>>>>>> I am a beginner in Kafka .. with my first Kafka
>>>>>>>>>> streams application ..
>>>>>>>>>> 
>>>>>>>>>> I have a streams application that reads from a
>>>>>>>>>> topic, does some transformation on the data and
>>>>>>>>>> writes to another topic. The record that I
>>>>>>>>>> manipulate is a CSV record.
>>>>>>>>>> 
>>>>>>>>>> It runs fine when I run it on a local Kafka
>>>>>>>>>> instance.
>>>>>>>>>> 
>>>>>>>>>> However when I run it on an AWS cluster, I get
>>>>>>>>>> the following exception when I try to produce
>>>>>>>>>> the transformed record into the target topic.
>>>>>>>>>> 
>>>>>>>>>> Exception in thread "StreamThread-1" 
>>>>>>>>>> java.lang.IllegalArgumentException: Invalid
>>>>>>>>>> timestamp -1 at 
>>>>>>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
>>>>>>>>>>
>>>>>>>>>> 
(ProducerRecord.java:60) at
>>>>>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>>>>>>>>>>
>>>>>>>>>> 
process(SinkNode.java:72) at
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at
>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>>>>>>
>>>>>>>>>>
>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
oces
>
>>>>>>>>>> 
s
>>>>> 
>>>>>>>>>> 
> (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>> ProcessorNode.java:68)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at
>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough
$
>>>>>>>>>>
>>>>>>>>>>
>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>>>>>>>>>> at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
oces
>
>>>>>>>>>> 
s
>>>>> 
>>>>>>>>>> 
> (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>> ProcessorNode.java:68)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:351) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at
>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
>>>>>>>>>>
>>>>>>>>>> 
KStreamBranchProcessor.process(KStreamBranch.java:46) at
>>>>>>>>>> 
>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
oces
>
>>>>>>>>>> 
s
>>>>> 
>>>>>>>>>> 
> (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>> ProcessorNode.java:68)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at
>>>>>>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>>>>>>>>>>
>>>>>>>>>>
>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
KStreamMapProcessor.process(KStreamMapValues.java:42) at
>>>>>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.pr
oces
>
>>>>>>>>>> 
s
>>>>> 
>>>>>>>>>> 
> (
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>> ProcessorNode.java:68)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.forward(StreamTask.java:338) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>>>>>>>>>
>>>>>>>>>>
>
>>>>>>>>>> 
at org.apache.kafka.streams.processor.internals.
>>>>>>>>>> SourceNode.process(SourceNode.java:64) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamTask.process(StreamTask.java:174) at 
>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.run
Loop
>
>>>>>>>>>> 
(
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> StreamThread.java:320)
>>>>>>>>>> at org.apache.kafka.streams.processor.internals. 
>>>>>>>>>> StreamThread.run(StreamThread.java:218)
>>>>>>>>>> 
>>>>>>>>>> Looks like the timestamp passed to the
>>>>>>>>>> ProducerRecord is -1, though I am not passing any
>>>>>>>>>> timestamp explicitly. I am not sure why this
>>>>>>>>>> happens. But I see that the Javadoc for
>>>>>>>>>> ProducerRecord says the following ..
>>>>>>>>>> 
>>>>>>>>>> The record also has an associated timestamp. If
>>>>>>>>>> the user did not provide a
>>>>>>>>>>> timestamp, the producer will stamp the record
>>>>>>>>>>> with its current time. The timestamp eventually
>>>>>>>>>>> used by Kafka depends on the timestamp type
>>>>>>>>>>> configured for the topic. If the topic is
>>>>>>>>>>> configured to use CreateTime, the timestamp in
>>>>>>>>>>> the producer record will be used by the broker.
>>>>>>>>>>> If the topic is configured to use 
>>>>>>>>>>> LogAppendTime, the timestamp in the producer
>>>>>>>>>>> record will be overwritten by the broker with
>>>>>>>>>>> the broker local time when it appends the
>>>>>>>>>>> message to its log. In either of the cases
>>>>>>>>>>> above, the timestamp that has actually been
>>>>>>>>>>> used will be returned to user in
>>>>>>>>>>> RecordMetadata
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 1. Will this problem be solved if I configure the
>>>>>>>>>> topic with LogAppendTime or CreateTime explicitly
>>>>>>>>>> ? 2. What is the default setting of this property
>>>>>>>>>> in a newly created topic ? 3. How do I change it
>>>>>>>>>> (what is the name of the property to be set) ? 4.
>>>>>>>>>> Any idea why I face this problem in the cluster
>>>>>>>>>> mode but not in the local mode ?
>>>>>>>>>> 
>>>>>>>>>> BTW I am using 0.10.1.
>>>>>>>>>> 
>>>>>>>>>> Any help / pointer will be appreciated ?
>>>>>>>>>> 
>>>>>>>>>> regards.
>>>>>>>>>> 
>>>>>>>>>> -- Debasish Ghosh http://manning.com/ghosh2 
>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>> 
>>>>>>>>>> Twttr: @debasishg Blog:
>>>>>>>>>> http://debasishg.blogspot.com Code:
>>>>>>>>>> http://github.com/debasishg
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>> 
>> 
>> 
>> -- Debasish Ghosh http://manning.com/ghosh2 
>> http://manning.com/ghosh
>> 
>> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code:
>> http://github.com/debasishg
>> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYFaONAAoJECnhiMLycopPH/4P+gOUVW4Ab6UBlrRiap1snDkC
1xNaWIzlyF2i8nZ+FUTmXrvJiF1TSSw8f+apyZpjf+Q8uMS1Bv6ZzqqiHVC0+gFb
Ymis+pOHhEl3je5uJWf41emrUxvJHalDlrLqLCk0cxlTYgBCgoAxLtzbvFrejw0e
uYcfjz+mERK4upNZS3KbO8tMMpr+M163u02dUAhT6kuaJNfSICNKKEVIK9WrCAMN
skhrVhcM6XRh6YisU73erg2grcGAMTxYf53eWt8saRqICMlcmxDbTHSxJ6Qog4l4
c6OSxxtnB+rXYSDEtoWH3CSxkPK0zlLUDcKXqz4bUNrWgoaCjAfz2WuANNskUi3L
dZlfO+vvbS2NLjqNFzqVUjV5tnbdCL7MTO4ByQZ7Jh9TQeOyMkHW8+fGsCZUJ3Ex
SIx95MYJyOk1n390yFjjeJEQT/yHIq7nXXgxjJ6dBPfEIB6VwXOHGvaucXMus2XF
ioBr/CuoNhWTfHn19TNSkSObJP61W2YCA1xHlSzVutHoISKHZKRq0z6AbbLdD7Z3
weLV5zHrnmibeKDYc+OX+60Kr+YsgSqeNwDx+EVFjkMTagUydzg06PcWigSLl6ZC
2rxs2Esb0W+Q4Iy+IYCzfQuQGQz5oSiUB32hxTio6dDz8otBY6+U3QrBz7mOtc+v
OeP4OeKsbMBBZGJNa0Ux
=NjV2
-----END PGP SIGNATURE-----

Reply via email to