I agree .. the problem is DC/OS still ships the older version. Let me check
if I can upgrade this ..

Thanks!

On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Btw: I would highly recommend to use Kafka 0.10.1 -- there are many
> new Streams feature and usability improvements and bug fixes.
>
> - -Matthias
>
> On 10/28/16 11:42 PM, Matthias J. Sax wrote:
> > That sounds reasonable. However, I am wondering how your Streams
> > application can connect to 0.9 broker in the first place. Streams
> > internally uses standard Kafka clients, and those are not backward
> > compatible. Thus, the 0.10 Streams clients should not be able to
> > connect to 0.9 broker.
> >
> > In case you do have 0.10 brokers, it might however happen, that
> > bin/kafka-console-producer.sh does use 0.9 producer. Broker are
> > backward compatible, thus, a 0.9 producer can write to 0.10 broker
> > (and in this case record TS would be invalid). While I assume that
> > in you local environment you are using 0.10
> > bin/kafka-console-produer.sh and thus all works fine.
> >
> >
> > -Matthias
> >
> >
> > On 10/28/16 11:00 PM, Debasish Ghosh wrote:
> >> Hello Mathias -
> >
> >> Thanks a lot for the response. I think what may be happening is
> >> a version mismatch between the development & deployment versions
> >> of Kafka. The Kafka streams application that I developed uses
> >> 0.10.0 based libraries. And my local environment contains a
> >> server installation of the same version. Hence it works ok in my
> >> local environment.
> >
> >> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the
> >> service through DC/OS cli. And I use this version to load records
> >> into the input topic. And try to consume using the deployed
> >> streams application which I developed using 0.10.0. Hence the
> >> producer did not put the timestamp while the consumer expects to
> >> have one.
> >
> >> I need to check if 0.10.x is available for DC/OS ..
> >
> >> Thanks again for your suggestions.
> >
> >
> >> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax
> >> <matth...@confluent.io> wrote:
> >
> >> Hey,
> >
> >> we just added a new FAQ entry for upcoming CP 3.2 release that
> >> answers your question. I just c&p it here. More concrete answer
> >> below.
> >
> >>>>> If you get an exception similar to the one shown below,
> >>>>> there are multiple possible causes:
> >>>>>
> >>>>> Exception in thread "StreamThread-1"
> >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
> >>>>>
> >>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe
> c
> >
> >>>>>
> ord
> >
> >>>>>
> > .java:60)
> >>>>>
> >>>>> This error means that the timestamp extractor of your Kafka
> >>>>>  Streams application failed to extract a valid timestamp
> >>>>> from a record. Typically, this points to a problem with the
> >>>>> record (e.g., the record does not contain a timestamp at
> >>>>> all), but it could also indicate a problem or bug in the
> >>>>> timestamp extractor used by the application.
> >>>>>
> >>>>> When does a record not contain a valid timestamp:
> >>>>>
> >>>>> If you are using the default
> >>>>> ConsumerRecordTimestampExtractor, it is most likely that
> >>>>> your records do not carry an embedded timestamp (embedded
> >>>>> record timestamps got introduced in Kafka's message format
> >>>>> in Kafka 0.10). This might happen, if you consume a topic
> >>>>> that is written by old Kafka producer clients (ie, version
> >>>>> 0.9 or earlier) or third party producer clients. A common
> >>>>> situation where this may happen is after upgrading your
> >>>>> Kafka cluster from 0.9 to 0.10, where all the data that was
> >>>>> generated with 0.9 is not compatible with the 0.10 message
> >>>>> format. If you are using a custom timestamp extractor, make
> >>>>> sure that your extractor is robust to missing timestamps in
> >>>>> your records. For example, you can return a default or
> >>>>> estimated timestamp if you cannot extract a valid timestamp
> >>>>> (maybe the timstamp field in your data is just missing).
> >>>>> You can also switch to processing time semantics via
> >>>>> WallclockTimestampExtractor; whether such a fallback is an
> >>>>> appropriate response to this situation depends on your use
> >>>>> case. However, as a first step you should identify and fix
> >>>>> the root cause for why such problematic records were
> >>>>> written to Kafka in the first place. In a second step you
> >>>>> may consider applying workarounds (as described above) when
> >>>>> dealing with such records (for example, if you need to
> >>>>> process those records after all). Another option is to
> >>>>> regenerate the records with correct timestamps and write
> >>>>> them to a new Kafka topic.
> >>>>>
> >>>>> When the timestamp extractor causes the problem:
> >>>>>
> >>>>> In this situation you should debug and fix the erroneous
> >>>>> extractor. If the extractor is built into Kafka, please
> >>>>> report the bug to the Kafka developer mailing list at
> >>>>> d...@kafka.apache.org (see instructions
> >>>>> http://kafka.apache.org/contact); in the meantime, you may
> >>>>> write a custom timestamp extractor that fixes the problem
> >>>>> and configure your application to use that extractor for
> >>>>> the time being.
> >
> >
> >> To address you questions more concretely:
> >
> >> 1. Yes an no: Yes, for any new data you write to you topic. No,
> >> for any already written data that does not have a valid
> >> timestamp set 2. Default is creating time 3. Config parameter
> >> "message.timestamp.type") It's a broker side per topic setting
> >> (however, be aware that Java KafkaProducer does verify the
> >> timestamp locally before sending the message to the broker, thus
> >> on -1 there will be the client side exception you did observe( 4.
> >> I assume that you do consumer different topic with different TS
> >> fields in you records.
> >
> >> Also have a look at:
> >> http://docs.confluent.io/current/streams/concepts.html#time
> >
> >
> >
> >> -Matthias
> >
> >
> >> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> >>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned
> >>>>> in the last mail. And I am using Kafka within a DC/OS
> >>>>> cluster under AWS.
> >>>>>
> >>>>> The version that I mentioned works ok is on my local
> >>>>> machine using a local Kafka installation. And it works for
> >>>>> both single broker and multi broker scenario.
> >>>>>
> >>>>> Thanks.
> >>>>>
> >>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> >>>>> <ghosh.debas...@gmail.com> wrote:
> >>>>>
> >>>>>> Hello -
> >>>>>>
> >>>>>> I am a beginner in Kafka .. with my first Kafka streams
> >>>>>> application ..
> >>>>>>
> >>>>>> I have a streams application that reads from a topic,
> >>>>>> does some transformation on the data and writes to
> >>>>>> another topic. The record that I manipulate is a CSV
> >>>>>> record.
> >>>>>>
> >>>>>> It runs fine when I run it on a local Kafka instance.
> >>>>>>
> >>>>>> However when I run it on an AWS cluster, I get the
> >>>>>> following exception when I try to produce the
> >>>>>> transformed record into the target topic.
> >>>>>>
> >>>>>> Exception in thread "StreamThread-1"
> >>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1
> >>>>>> at
> >>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>
> >>>>>> (ProducerRecord.java:60) at
> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>>> process(SinkNode.java:72) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>
> >>>>>>
> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>>
> >>>>>>
> >
> >>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
> s
> >
> >>>>>>
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> > ProcessorNode.java:68)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>
> >>>>>>
> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
> >>>>>>
> >>>>>>
> >
> >>>>>>
> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
> s
> >
> >>>>>>
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> > ProcessorNode.java:68)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:351) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192)
> >>>>>>
> >>>>>>
> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >>>>>>  KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >>>>>>
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
> s
> >
> >>>>>>
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> > ProcessorNode.java:68)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>
> >>>>>>
> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >>>>>>
> >>>>>>
> >
> >>>>>>
> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces
> s
> >
> >>>>>>
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> > ProcessorNode.java:68)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.forward(StreamTask.java:338) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> >>>>>>
> >>>>>>
> at org.apache.kafka.streams.processor.internals.
> >>>>>> SourceNode.process(SourceNode.java:64) at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>> StreamTask.process(StreamTask.java:174) at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop
> (
> >>>>>>
> >>>>>>
> >
> >>>>>>
> >>>>>>
> > StreamThread.java:320)
> >>>>>> at org.apache.kafka.streams.processor.internals.
> >>>>>> StreamThread.run(StreamThread.java:218)
> >>>>>>
> >>>>>> Looks like the timestamp passed to the ProducerRecord is
> >>>>>> -1, though I am not passing any timestamp explicitly. I
> >>>>>> am not sure why this happens. But I see that the Javadoc
> >>>>>> for ProducerRecord says the following ..
> >>>>>>
> >>>>>> The record also has an associated timestamp. If the user
> >>>>>> did not provide a
> >>>>>>> timestamp, the producer will stamp the record with its
> >>>>>>> current time. The timestamp eventually used by Kafka
> >>>>>>> depends on the timestamp type configured for the
> >>>>>>> topic. If the topic is configured to use CreateTime,
> >>>>>>> the timestamp in the producer record will be used by
> >>>>>>> the broker. If the topic is configured to use
> >>>>>>> LogAppendTime, the timestamp in the producer record
> >>>>>>> will be overwritten by the broker with the broker local
> >>>>>>> time when it appends the message to its log. In either
> >>>>>>> of the cases above, the timestamp that has actually
> >>>>>>> been used will be returned to user in RecordMetadata
> >>>>>>
> >>>>>>
> >>>>>> 1. Will this problem be solved if I configure the topic
> >>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is
> >>>>>> the default setting of this property in a newly created
> >>>>>> topic ? 3. How do I change it (what is the name of the
> >>>>>> property to be set) ? 4. Any idea why I face this
> >>>>>> problem in the cluster mode but not in the local mode ?
> >>>>>>
> >>>>>> BTW I am using 0.10.1.
> >>>>>>
> >>>>>> Any help / pointer will be appreciated ?
> >>>>>>
> >>>>>> regards.
> >>>>>>
> >>>>>> -- Debasish Ghosh http://manning.com/ghosh2
> >>>>>> http://manning.com/ghosh
> >>>>>>
> >>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com
> >>>>>> Code: http://github.com/debasishg
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >
> >
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n
> 6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki
> tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl
> E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe
> zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41
> mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX
> MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn
> xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM
> RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72
> IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt
> 21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO
> rA3fRxGlqR7RWyElKC51
> =zBM7
> -----END PGP SIGNATURE-----
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to