Hello Mathias - Regarding ..
In case you do have 0.10 brokers, it might however happen, that bin/kafka-console-producer.sh > does use 0.9 producer. How can I check this ? Thanks! On Sat, Oct 29, 2016 at 12:23 PM, Debasish Ghosh <[email protected]> wrote: > I agree .. the problem is DC/OS still ships the older version. Let me > check if I can upgrade this .. > > Thanks! > > On Sat, Oct 29, 2016 at 12:21 PM, Matthias J. Sax <[email protected]> > wrote: > >> -----BEGIN PGP SIGNED MESSAGE----- >> Hash: SHA512 >> >> Btw: I would highly recommend to use Kafka 0.10.1 -- there are many >> new Streams feature and usability improvements and bug fixes. >> >> - -Matthias >> >> On 10/28/16 11:42 PM, Matthias J. Sax wrote: >> > That sounds reasonable. However, I am wondering how your Streams >> > application can connect to 0.9 broker in the first place. Streams >> > internally uses standard Kafka clients, and those are not backward >> > compatible. Thus, the 0.10 Streams clients should not be able to >> > connect to 0.9 broker. >> > >> > In case you do have 0.10 brokers, it might however happen, that >> > bin/kafka-console-producer.sh does use 0.9 producer. Broker are >> > backward compatible, thus, a 0.9 producer can write to 0.10 broker >> > (and in this case record TS would be invalid). While I assume that >> > in you local environment you are using 0.10 >> > bin/kafka-console-produer.sh and thus all works fine. >> > >> > >> > -Matthias >> > >> > >> > On 10/28/16 11:00 PM, Debasish Ghosh wrote: >> >> Hello Mathias - >> > >> >> Thanks a lot for the response. I think what may be happening is >> >> a version mismatch between the development & deployment versions >> >> of Kafka. The Kafka streams application that I developed uses >> >> 0.10.0 based libraries. And my local environment contains a >> >> server installation of the same version. Hence it works ok in my >> >> local environment. >> > >> >> But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the >> >> service through DC/OS cli. And I use this version to load records >> >> into the input topic. And try to consume using the deployed >> >> streams application which I developed using 0.10.0. Hence the >> >> producer did not put the timestamp while the consumer expects to >> >> have one. >> > >> >> I need to check if 0.10.x is available for DC/OS .. >> > >> >> Thanks again for your suggestions. >> > >> > >> >> On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax >> >> <[email protected]> wrote: >> > >> >> Hey, >> > >> >> we just added a new FAQ entry for upcoming CP 3.2 release that >> >> answers your question. I just c&p it here. More concrete answer >> >> below. >> > >> >>>>> If you get an exception similar to the one shown below, >> >>>>> there are multiple possible causes: >> >>>>> >> >>>>> Exception in thread "StreamThread-1" >> >>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 at >> >>>>> >> >>>>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRe >> c >> > >> >>>>> >> ord >> > >> >>>>> >> > .java:60) >> >>>>> >> >>>>> This error means that the timestamp extractor of your Kafka >> >>>>> Streams application failed to extract a valid timestamp >> >>>>> from a record. Typically, this points to a problem with the >> >>>>> record (e.g., the record does not contain a timestamp at >> >>>>> all), but it could also indicate a problem or bug in the >> >>>>> timestamp extractor used by the application. >> >>>>> >> >>>>> When does a record not contain a valid timestamp: >> >>>>> >> >>>>> If you are using the default >> >>>>> ConsumerRecordTimestampExtractor, it is most likely that >> >>>>> your records do not carry an embedded timestamp (embedded >> >>>>> record timestamps got introduced in Kafka's message format >> >>>>> in Kafka 0.10). This might happen, if you consume a topic >> >>>>> that is written by old Kafka producer clients (ie, version >> >>>>> 0.9 or earlier) or third party producer clients. A common >> >>>>> situation where this may happen is after upgrading your >> >>>>> Kafka cluster from 0.9 to 0.10, where all the data that was >> >>>>> generated with 0.9 is not compatible with the 0.10 message >> >>>>> format. If you are using a custom timestamp extractor, make >> >>>>> sure that your extractor is robust to missing timestamps in >> >>>>> your records. For example, you can return a default or >> >>>>> estimated timestamp if you cannot extract a valid timestamp >> >>>>> (maybe the timstamp field in your data is just missing). >> >>>>> You can also switch to processing time semantics via >> >>>>> WallclockTimestampExtractor; whether such a fallback is an >> >>>>> appropriate response to this situation depends on your use >> >>>>> case. However, as a first step you should identify and fix >> >>>>> the root cause for why such problematic records were >> >>>>> written to Kafka in the first place. In a second step you >> >>>>> may consider applying workarounds (as described above) when >> >>>>> dealing with such records (for example, if you need to >> >>>>> process those records after all). Another option is to >> >>>>> regenerate the records with correct timestamps and write >> >>>>> them to a new Kafka topic. >> >>>>> >> >>>>> When the timestamp extractor causes the problem: >> >>>>> >> >>>>> In this situation you should debug and fix the erroneous >> >>>>> extractor. If the extractor is built into Kafka, please >> >>>>> report the bug to the Kafka developer mailing list at >> >>>>> [email protected] (see instructions >> >>>>> http://kafka.apache.org/contact); in the meantime, you may >> >>>>> write a custom timestamp extractor that fixes the problem >> >>>>> and configure your application to use that extractor for >> >>>>> the time being. >> > >> > >> >> To address you questions more concretely: >> > >> >> 1. Yes an no: Yes, for any new data you write to you topic. No, >> >> for any already written data that does not have a valid >> >> timestamp set 2. Default is creating time 3. Config parameter >> >> "message.timestamp.type") It's a broker side per topic setting >> >> (however, be aware that Java KafkaProducer does verify the >> >> timestamp locally before sending the message to the broker, thus >> >> on -1 there will be the client side exception you did observe( 4. >> >> I assume that you do consumer different topic with different TS >> >> fields in you records. >> > >> >> Also have a look at: >> >> http://docs.confluent.io/current/streams/concepts.html#time >> > >> > >> > >> >> -Matthias >> > >> > >> >> On 10/28/16 5:42 AM, Debasish Ghosh wrote: >> >>>>> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned >> >>>>> in the last mail. And I am using Kafka within a DC/OS >> >>>>> cluster under AWS. >> >>>>> >> >>>>> The version that I mentioned works ok is on my local >> >>>>> machine using a local Kafka installation. And it works for >> >>>>> both single broker and multi broker scenario. >> >>>>> >> >>>>> Thanks. >> >>>>> >> >>>>> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh >> >>>>> <[email protected]> wrote: >> >>>>> >> >>>>>> Hello - >> >>>>>> >> >>>>>> I am a beginner in Kafka .. with my first Kafka streams >> >>>>>> application .. >> >>>>>> >> >>>>>> I have a streams application that reads from a topic, >> >>>>>> does some transformation on the data and writes to >> >>>>>> another topic. The record that I manipulate is a CSV >> >>>>>> record. >> >>>>>> >> >>>>>> It runs fine when I run it on a local Kafka instance. >> >>>>>> >> >>>>>> However when I run it on an AWS cluster, I get the >> >>>>>> following exception when I try to produce the >> >>>>>> transformed record into the target topic. >> >>>>>> >> >>>>>> Exception in thread "StreamThread-1" >> >>>>>> java.lang.IllegalArgumentException: Invalid timestamp -1 >> >>>>>> at >> >>>>>> org.apache.kafka.clients.producer.ProducerRecord.<init> >> >>>>>> (ProducerRecord.java:60) at >> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode. >> >>>>>> process(SinkNode.java:72) at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>> StreamTask.forward(StreamTask.java:338) at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >> >>>>>> >> >>>>>> >> at >> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ >> >>>>>> >> >>>>>> >> > >> >>>>>> >> KStreamMapProcessor.process(KStreamMapValues.java:42) at >> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces >> s >> > >> >>>>>> >> ( >> >>>>>> >> >>>>>> >> > >> >>>>>> >> > ProcessorNode.java:68) >> >>>>>> at org.apache.kafka.streams.processor.internals. >> >>>>>> StreamTask.forward(StreamTask.java:338) at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >> >>>>>> >> >>>>>> >> at >> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ >> >>>>>> >> >>>>>> >> > >> >>>>>> >> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) >> >>>>>> at >> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces >> s >> > >> >>>>>> >> ( >> >>>>>> >> >>>>>> >> > >> >>>>>> >> > ProcessorNode.java:68) >> >>>>>> at org.apache.kafka.streams.processor.internals. >> >>>>>> StreamTask.forward(StreamTask.java:351) at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) >> >>>>>> >> >>>>>> >> at >> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ >> >>>>>> KStreamBranchProcessor.process(KStreamBranch.java:46) at >> >>>>>> >> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces >> s >> > >> >>>>>> >> ( >> >>>>>> >> >>>>>> >> > >> >>>>>> >> > ProcessorNode.java:68) >> >>>>>> at org.apache.kafka.streams.processor.internals. >> >>>>>> StreamTask.forward(StreamTask.java:338) at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >> >>>>>> >> >>>>>> >> at >> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ >> >>>>>> >> >>>>>> >> > >> >>>>>> >> KStreamMapProcessor.process(KStreamMapValues.java:42) at >> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.proces >> s >> > >> >>>>>> >> ( >> >>>>>> >> >>>>>> >> > >> >>>>>> >> > ProcessorNode.java:68) >> >>>>>> at org.apache.kafka.streams.processor.internals. >> >>>>>> StreamTask.forward(StreamTask.java:338) at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >> >>>>>> >> >>>>>> >> at org.apache.kafka.streams.processor.internals. >> >>>>>> SourceNode.process(SourceNode.java:64) at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>> StreamTask.process(StreamTask.java:174) at >> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop >> ( >> >>>>>> >> >>>>>> >> > >> >>>>>> >> >>>>>> >> > StreamThread.java:320) >> >>>>>> at org.apache.kafka.streams.processor.internals. >> >>>>>> StreamThread.run(StreamThread.java:218) >> >>>>>> >> >>>>>> Looks like the timestamp passed to the ProducerRecord is >> >>>>>> -1, though I am not passing any timestamp explicitly. I >> >>>>>> am not sure why this happens. But I see that the Javadoc >> >>>>>> for ProducerRecord says the following .. >> >>>>>> >> >>>>>> The record also has an associated timestamp. If the user >> >>>>>> did not provide a >> >>>>>>> timestamp, the producer will stamp the record with its >> >>>>>>> current time. The timestamp eventually used by Kafka >> >>>>>>> depends on the timestamp type configured for the >> >>>>>>> topic. If the topic is configured to use CreateTime, >> >>>>>>> the timestamp in the producer record will be used by >> >>>>>>> the broker. If the topic is configured to use >> >>>>>>> LogAppendTime, the timestamp in the producer record >> >>>>>>> will be overwritten by the broker with the broker local >> >>>>>>> time when it appends the message to its log. In either >> >>>>>>> of the cases above, the timestamp that has actually >> >>>>>>> been used will be returned to user in RecordMetadata >> >>>>>> >> >>>>>> >> >>>>>> 1. Will this problem be solved if I configure the topic >> >>>>>> with LogAppendTime or CreateTime explicitly ? 2. What is >> >>>>>> the default setting of this property in a newly created >> >>>>>> topic ? 3. How do I change it (what is the name of the >> >>>>>> property to be set) ? 4. Any idea why I face this >> >>>>>> problem in the cluster mode but not in the local mode ? >> >>>>>> >> >>>>>> BTW I am using 0.10.1. >> >>>>>> >> >>>>>> Any help / pointer will be appreciated ? >> >>>>>> >> >>>>>> regards. >> >>>>>> >> >>>>>> -- Debasish Ghosh http://manning.com/ghosh2 >> >>>>>> http://manning.com/ghosh >> >>>>>> >> >>>>>> Twttr: @debasishg Blog: http://debasishg.blogspot.com >> >>>>>> Code: http://github.com/debasishg >> >>>>>> >> >>>>> >> >>>>> >> >>>>> >> >>> >> > >> > >> > >> > >> -----BEGIN PGP SIGNATURE----- >> Comment: GPGTools - https://gpgtools.org >> >> iQIcBAEBCgAGBQJYFEcMAAoJECnhiMLycopPPrwQAJyWn+InO+JcrDnNaSkfEt3n >> 6sp5rjINdTEA1PIorEDDQcwaq8gB/DwTQOKsBUDnukLc4VI/HPzpWRaBGVJkw+ki >> tm1UpGG4LBlvQ/E4S3a+c15X03IgNQ3htLwipuC0qqtpYmo2OB2+035Ewch1RlRl >> E3mL1v14CEsvf/a+If3w+wkS3CoSey6SlWBk//Z0OCd7yy68DxO94JpxnP0M7vNe >> zICCnxqSHTFjNMipQP/uX0hT2HM0J1q4HeWCKcVB6VQgpu97gypQT25L5iatOv41 >> mFXVFKrYllvlYgLXq5PakI47H1DnkZNlN8maiKLC+7nrzqy0VTQhdxPLg6mVqVPX >> MrkJ2jzrvI58F37Ac8vRFvgBJo5XVgaocY71rLmrVn3WA4oUpJRGB5fZe5vqJbDn >> xAPjgRU2BA3l8nekG5iQ1O5osAhkT4PNzA/WTV2FGoNUu/zNupfe0Qipnsm8rqIM >> RNTlCzDQU2X3dqUTm+Ze5Sn6WTjyiu9HPhYXrCgncAMFHMVH/4Tq53aJoiC7cz72 >> IMXrQr7oU8hkgCzDMQ+kncHnquj23xDt7lsUyD8AJ6hfOcDLKQ3XyXo72bjnpGYt >> 21qBP3JqABkeHYrSFuR3BCL/VJ0JSGgjBVkKjXwZOZ+3lDAuHRd/5ZR5AeoveHwO >> rA3fRxGlqR7RWyElKC51 >> =zBM7 >> -----END PGP SIGNATURE----- >> > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg
