Hello Mathias -

Regarding ..

In case you do have 0.10 brokers, it might however happen, that
bin/kafka-console-producer.sh
> does use 0.9 producer.


How can I check this ?

Thanks!

On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> I agree .. the problem is DC/OS still ships the older version. Let me
> check if I can upgrade this ..
>
> Thanks!
>
> On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> -----BEGIN PGP SIGNED MESSAGE-----
>> Hash: SHA512
>>
>> Btw: I would highly recommend to use Kafka 0.10.1 -- there are many
>> new Streams feature and usability improvements and bug fixes.
>>
>> - -Matthias
>>
>> On 10/28/16 11:42 PM, Matthias J. Sax wrote:
>> > That sounds reasonable. However, I am wondering how your Streams
>> > application can connect to 0.9 broker in the first place. Streams
>> > internally uses standard Kafka clients, and those are not backward
>> > compatible. Thus, the 0.10 Streams clients should not be able to
>> > connect to 0.9 broker.
>> >
>> > In case you do have 0.10 brokers, it might however happen, that
>> > bin/kafka-console-producer.sh does use 0.9 producer. Broker are
>> > backward compatible, thus, a 0.9 producer can write to 0.10 broker
>> > (and in this case record TS would be invalid). While I assume that
>> > in you local environment you are using 0.10
>> > bin/kafka-console-produer.sh and thus all works fine.
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 10/28/16 11:00 PM, Debasish Ghosh wrote:
>> >> Hello Mathias -
>> >
>> >> Thanks a lot for the response. I think what may be happening is
>> >> a version mismatch between the development & deployment versions
>> >> of Kafka. The Kafka streams application that I developed uses
>> >> 0.10.0 based libraries. And my local environment contains a
>> >> server installation of the same version. Hence it works ok in my
>> >> local environment.
>> >
>> >> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the
>> >> service through DC/OS cli. And I use this version to load records
>> >> into the input topic. And try to consume using the deployed
>> >> streams application which I developed using 0.10.0. Hence the
>> >> producer did not put the timestamp while the consumer expects to
>> >> have one.
>> >
>> >> I need to check if 0.10.x is available for DC/OS ..
>> >
>> >> Thanks again for your suggestions.
>> >
>> >
>> >> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
>> >> <matth...@confluent.io> wrote:
>> >
>> >> Hey,
>> >
>> >> we just added a new FAQ entry for upcoming CP 3.2 release that
>> >> answers your question. I just c&p it here. More concrete answer
>> >> below.
>> >
>> >>>>> If you get an exception similar to the one shown below,
>> >>>>> there are multiple possible causes:
>> >>>>>
>> >>>>> Exception in thread "StreamThread-1"
>> >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
>> >>>>>
>> >>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe
>> c
>> >
>> >>>>>
>> ord
>> >
>> >>>>>
>> > .java:60)
>> >>>>>
>> >>>>> This error means that the timestamp extractor of your Kafka
>> >>>>>  Streams application failed to extract a valid timestamp
>> >>>>> from a record. Typically, this points to a problem with the
>> >>>>> record (e.g., the record does not contain a timestamp at
>> >>>>> all), but it could also indicate a problem or bug in the
>> >>>>> timestamp extractor used by the application.
>> >>>>>
>> >>>>> When does a record not contain a valid timestamp:
>> >>>>>
>> >>>>> If you are using the default
>> >>>>> ConsumerRecordTimestampExtractor, it is most likely that
>> >>>>> your records do not carry an embedded timestamp (embedded
>> >>>>> record timestamps got introduced in Kafka's message format
>> >>>>> in Kafka 0.10). This might happen, if you consume a topic
>> >>>>> that is written by old Kafka producer clients (ie, version
>> >>>>> 0.9 or earlier) or third party producer clients. A common
>> >>>>> situation where this may happen is after upgrading your
>> >>>>> Kafka cluster from 0.9 to 0.10, where all the data that was
>> >>>>> generated with 0.9 is not compatible with the 0.10 message
>> >>>>> format. If you are using a custom timestamp extractor, make
>> >>>>> sure that your extractor is robust to missing timestamps in
>> >>>>> your records. For example, you can return a default or
>> >>>>> estimated timestamp if you cannot extract a valid timestamp
>> >>>>> (maybe the timstamp field in your data is just missing).
>> >>>>> You can also switch to processing time semantics via
>> >>>>> WallclockTimestampExtractor; whether such a fallback is an
>> >>>>> appropriate response to this situation depends on your use
>> >>>>> case. However, as a first step you should identify and fix
>> >>>>> the root cause for why such problematic records were
>> >>>>> written to Kafka in the first place. In a second step you
>> >>>>> may consider applying workarounds (as described above) when
>> >>>>> dealing with such records (for example, if you need to
>> >>>>> process those records after all). Another option is to
>> >>>>> regenerate the records with correct timestamps and write
>> >>>>> them to a new Kafka topic.
>> >>>>>
>> >>>>> When the timestamp extractor causes the problem:
>> >>>>>
>> >>>>> In this situation you should debug and fix the erroneous
>> >>>>> extractor. If the extractor is built into Kafka, please
>> >>>>> report the bug to the Kafka developer mailing list at
>> >>>>> d...@kafka.apache.org (see instructions
>> >>>>> http://kafka.apache.org/contact); in the meantime, you may
>> >>>>> write a custom timestamp extractor that fixes the problem
>> >>>>> and configure your application to use that extractor for
>> >>>>> the time being.
>> >
>> >
>> >> To address you questions more concretely:
>> >
>> >> 1. Yes an no: Yes, for any new data you write to you topic. No,
>> >> for any already written data that does not have a valid
>> >> timestamp set 2. Default is creating time 3. Config parameter
>> >> "message.timestamp.type") It's a broker side per topic setting
>> >> (however, be aware that Java KafkaProducer does verify the
>> >> timestamp locally before sending the message to the broker, thus
>> >> on -1 there will be the client side exception you did observe( 4.
>> >> I assume that you do consumer different topic with different TS
>> >> fields in you records.
>> >
>> >> Also have a look at:
>> >> http://docs.confluent.io/current/streams/concepts.html#time
>> >
>> >
>> >
>> >> -Matthias
>> >
>> >
>> >> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
>> >>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned
>> >>>>> in the last mail. And I am using Kafka within a DC/OS
>> >>>>> cluster under AWS.
>> >>>>>
>> >>>>> The version that I mentioned works ok is on my local
>> >>>>> machine using a local Kafka installation. And it works for
>> >>>>> both single broker and multi broker scenario.
>> >>>>>
>> >>>>> Thanks.
>> >>>>>
>> >>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
>> >>>>> <ghosh.debas...@gmail.com> wrote:
>> >>>>>
>> >>>>>> Hello -
>> >>>>>>
>> >>>>>> I am a beginner in Kafka .. with my first Kafka streams
>> >>>>>> application ..
>> >>>>>>
>> >>>>>> I have a streams application that reads from a topic,
>> >>>>>> does some transformation on the data and writes to
>> >>>>>> another topic. The record that I manipulate is a CSV
>> >>>>>> record.
>> >>>>>>
>> >>>>>> It runs fine when I run it on a local Kafka instance.
>> >>>>>>
>> >>>>>> However when I run it on an AWS cluster, I get the
>> >>>>>> following exception when I try to produce the
>> >>>>>> transformed record into the target topic.
>> >>>>>>
>> >>>>>> Exception in thread "StreamThread-1"
>> >>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1
>> >>>>>> at
>> >>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
>> >>>>>> (ProducerRecord.java:60) at
>> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>> >>>>>> process(SinkNode.java:72) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:338) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> >>>>>>
>> >>>>>>
>> at
>> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> KStreamMapProcessor.process(KStreamMapValues.java:42) at
>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
>> s
>> >
>> >>>>>>
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> > ProcessorNode.java:68)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:338) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> >>>>>>
>> >>>>>>
>> at
>> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
>> >>>>>> at
>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
>> s
>> >
>> >>>>>>
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> > ProcessorNode.java:68)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:351) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
>> >>>>>>
>> >>>>>>
>> at
>> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
>> >>>>>>  KStreamBranchProcessor.process(KStreamBranch.java:46) at
>> >>>>>>
>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
>> s
>> >
>> >>>>>>
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> > ProcessorNode.java:68)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:338) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> >>>>>>
>> >>>>>>
>> at
>> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> KStreamMapProcessor.process(KStreamMapValues.java:42) at
>> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
>> s
>> >
>> >>>>>>
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> > ProcessorNode.java:68)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.forward(StreamTask.java:338) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>> >>>>>>
>> >>>>>>
>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> SourceNode.process(SourceNode.java:64) at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamTask.process(StreamTask.java:174) at
>> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop
>> (
>> >>>>>>
>> >>>>>>
>> >
>> >>>>>>
>> >>>>>>
>> > StreamThread.java:320)
>> >>>>>> at org.apache.kafka.streams.processor.internals.
>> >>>>>> StreamThread.run(StreamThread.java:218)
>> >>>>>>
>> >>>>>> Looks like the timestamp passed to the ProducerRecord is
>> >>>>>> -1, though I am not passing any timestamp explicitly. I
>> >>>>>> am not sure why this happens. But I see that the Javadoc
>> >>>>>> for ProducerRecord says the following ..
>> >>>>>>
>> >>>>>> The record also has an associated timestamp. If the user
>> >>>>>> did not provide a
>> >>>>>>> timestamp, the producer will stamp the record with its
>> >>>>>>> current time. The timestamp eventually used by Kafka
>> >>>>>>> depends on the timestamp type configured for the
>> >>>>>>> topic. If the topic is configured to use CreateTime,
>> >>>>>>> the timestamp in the producer record will be used by
>> >>>>>>> the broker. If the topic is configured to use
>> >>>>>>> LogAppendTime, the timestamp in the producer record
>> >>>>>>> will be overwritten by the broker with the broker local
>> >>>>>>> time when it appends the message to its log. In either
>> >>>>>>> of the cases above, the timestamp that has actually
>> >>>>>>> been used will be returned to user in RecordMetadata
>> >>>>>>
>> >>>>>>
>> >>>>>> 1. Will this problem be solved if I configure the topic
>> >>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is
>> >>>>>> the default setting of this property in a newly created
>> >>>>>> topic ? 3. How do I change it (what is the name of the
>> >>>>>> property to be set) ? 4. Any idea why I face this
>> >>>>>> problem in the cluster mode but not in the local mode ?
>> >>>>>>
>> >>>>>> BTW I am using 0.10.1.
>> >>>>>>
>> >>>>>> Any help / pointer will be appreciated ?
>> >>>>>>
>> >>>>>> regards.
>> >>>>>>
>> >>>>>> -- Debasish Ghosh http://manning.com/ghosh2
>> >>>>>> http://manning.com/ghosh
>> >>>>>>
>> >>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
>> >>>>>> Code: http://github.com/debasishg
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >
>> >
>> >
>> >
>> -----BEGIN PGP SIGNATURE-----
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n
>> 6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki
>> tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl
>> E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe
>> zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41
>> mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX
>> MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn
>> xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM
>> RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72
>> IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt
>> 21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO
>> rA3fRxGlqR7RWyElKC51
>> =zBM7
>> -----END PGP SIGNATURE-----
>>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to