Hello Mathias -

Thanks a lot for the response. I think what may be happening is a version
mismatch between the development & deployment versions of Kafka. The Kafka
streams application that I developed uses 0.10.0 based libraries. And my
local environment contains a server installation of the same version. Hence
it works ok in my local environment.

But the DC/OS 1.8 deploys 0.9.0 of Kafka when I install the service through
DC/OS cli. And I use this version to load records into the input topic. And
try to consume using the deployed streams application which I developed
using 0.10.0. Hence the producer did not put the timestamp while the
consumer expects to have one.

I need to check if 0.10.x is available for DC/OS ..

Thanks again for your suggestions.


On Sat, Oct 29, 2016 at 3:23 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hey,
>
> we just added a new FAQ entry for upcoming CP 3.2 release that answers
> your question. I just c&p it here. More concrete answer below.
>
> > If you get an exception similar to the one shown below, there are
> > multiple possible causes:
> >
> > Exception in thread "StreamThread-1"
> > java.lang.IllegalArgumentException: Invalid timestamp -1 at
> > org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord
> .java:60)
> >
> >  This error means that the timestamp extractor of your Kafka
> > Streams application failed to extract a valid timestamp from a
> > record. Typically, this points to a problem with the record (e.g.,
> > the record does not contain a timestamp at all), but it could also
> > indicate a problem or bug in the timestamp extractor used by the
> > application.
> >
> > When does a record not contain a valid timestamp:
> >
> > If you are using the default ConsumerRecordTimestampExtractor, it
> > is most likely that your records do not carry an embedded
> > timestamp (embedded record timestamps got introduced in Kafka's
> > message format in Kafka 0.10). This might happen, if you consume a
> > topic that is written by old Kafka producer clients (ie, version
> > 0.9 or earlier) or third party producer clients. A common situation
> > where this may happen is after upgrading your Kafka cluster from
> > 0.9 to 0.10, where all the data that was generated with 0.9 is not
> > compatible with the 0.10 message format. If you are using a custom
> > timestamp extractor, make sure that your extractor is robust to
> > missing timestamps in your records. For example, you can return a
> > default or estimated timestamp if you cannot extract a valid
> > timestamp (maybe the timstamp field in your data is just missing).
> > You can also switch to processing time semantics via
> > WallclockTimestampExtractor; whether such a fallback is an
> > appropriate response to this situation depends on your use case.
> > However, as a first step you should identify and fix the root cause
> > for why such problematic records were written to Kafka in the first
> > place. In a second step you may consider applying workarounds (as
> > described above) when dealing with such records (for example, if
> > you need to process those records after all). Another option is to
> > regenerate the records with correct timestamps and write them to a
> > new Kafka topic.
> >
> > When the timestamp extractor causes the problem:
> >
> > In this situation you should debug and fix the erroneous extractor.
> > If the extractor is built into Kafka, please report the bug to the
> > Kafka developer mailing list at d...@kafka.apache.org (see
> > instructions http://kafka.apache.org/contact); in the meantime, you
> > may write a custom timestamp extractor that fixes the problem and
> > configure your application to use that extractor for the time
> > being.
>
>
> To address you questions more concretely:
>
>   1. Yes an no: Yes, for any new data you write to you topic. No, for
> any already written data that does not have a valid timestamp set
>   2. Default is creating time
>   3. Config parameter "message.timestamp.type")
>      It's a broker side per topic setting
>      (however, be aware that Java KafkaProducer does verify the
> timestamp locally before sending the message to the broker, thus on -1
> there will be the client side exception you did observe(
>   4. I assume that you do consumer different topic with different TS
> fields in you records.
>
> Also have a look at:
> http://docs.confluent.io/current/streams/concepts.html#time
>
>
>
> - -Matthias
>
>
> On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> > I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the
> > last mail. And I am using Kafka within a DC/OS cluster under AWS.
> >
> > The version that I mentioned works ok is on my local machine using
> > a local Kafka installation. And it works for both single broker and
> > multi broker scenario.
> >
> > Thanks.
> >
> > On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> > <ghosh.debas...@gmail.com> wrote:
> >
> >> Hello -
> >>
> >> I am a beginner in Kafka .. with my first Kafka streams
> >> application ..
> >>
> >> I have a streams application that reads from a topic, does some
> >> transformation on the data and writes to another topic. The
> >> record that I manipulate is a CSV record.
> >>
> >> It runs fine when I run it on a local Kafka instance.
> >>
> >> However when I run it on an AWS cluster, I get the following
> >> exception when I try to produce the transformed record into the
> >> target topic.
> >>
> >> Exception in thread "StreamThread-1"
> >> java.lang.IllegalArgumentException: Invalid timestamp -1 at
> >> org.apache.kafka.clients.producer.ProducerRecord.<init>
> >> (ProducerRecord.java:60) at
> >> org.apache.kafka.streams.processor.internals.SinkNode.
> >> process(SinkNode.java:72) at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:338) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
> >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>
> >>
> ProcessorNode.java:68)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:338) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
> >> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$
> >> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>
> >>
> ProcessorNode.java:68)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:351) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) at
> >> org.apache.kafka.streams.kstream.internals.KStreamBranch$
> >> KStreamBranchProcessor.process(KStreamBranch.java:46) at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>
> >>
> ProcessorNode.java:68)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:338) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
> >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$
> >> KStreamMapProcessor.process(KStreamMapValues.java:42) at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>
> >>
> ProcessorNode.java:68)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamTask.forward(StreamTask.java:338) at
> >> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
> >> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:64) at
> >> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:174) at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>
> >>
> StreamThread.java:320)
> >> at org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:218)
> >>
> >> Looks like the timestamp passed to the ProducerRecord is -1,
> >> though I am not passing any timestamp explicitly. I am not sure
> >> why this happens. But I see that the Javadoc for ProducerRecord
> >> says the following ..
> >>
> >> The record also has an associated timestamp. If the user did not
> >> provide a
> >>> timestamp, the producer will stamp the record with its current
> >>> time. The timestamp eventually used by Kafka depends on the
> >>> timestamp type configured for the topic. If the topic is
> >>> configured to use CreateTime, the timestamp in the producer
> >>> record will be used by the broker. If the topic is configured
> >>> to use LogAppendTime, the timestamp in the producer record will
> >>> be overwritten by the broker with the broker local time when it
> >>> appends the message to its log. In either of the cases above,
> >>> the timestamp that has actually been used will be returned to
> >>> user in RecordMetadata
> >>
> >>
> >> 1. Will this problem be solved if I configure the topic with
> >> LogAppendTime or CreateTime explicitly ? 2. What is the default
> >> setting of this property in a newly created topic ? 3. How do I
> >> change it (what is the name of the property to be set) ? 4. Any
> >> idea why I face this problem in the cluster mode but not in the
> >> local mode ?
> >>
> >> BTW I am using 0.10.1.
> >>
> >> Any help / pointer will be appreciated ?
> >>
> >> regards.
> >>
> >> -- Debasish Ghosh http://manning.com/ghosh2
> >> http://manning.com/ghosh
> >>
> >> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code:
> >> http://github.com/debasishg
> >>
> >
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYE8jGAAoJECnhiMLycopPZmYQAIJ+wCZb8l1dJNDxbpWvnopz
> pRB/YKoXLUA/BxQ/z8PZR8gd/0evQ92Kwt/7ZuS1no7o1CNixptZ81ycoxN7pj4q
> QuVN/D9QDJvCDUdhDHFT50dwvHehgoD7oj2MLOlwCH1CWjd8REyRP+8gQDJ/6jko
> frQUXK5hpT3QDl2F3kvS0JL40SDSOxAIxxFL0EH4midCXHhn3KR/XmdFSRm8Gmnp
> jZ6+FpTEO3ntav0uhaA5zPwoMAIyc/Jcx6rFrzGBcnPPE0LkYCvPhmmN44OCe7N6
> mR/y13IAVUfaOmQqdXr8lkLVWdSxFKX37wASie43jripHygFxOAYn5NtHqan4OvK
> EVjQDL87fZ+938ML97CGtiTdhLX65WBvd9IWgScI6vHZ84AgaeWvLLdZeHNWDboF
> NJPWqk9VYFObre8pGHrFEha7RPz0fBqWP8uTc1pEZDIll44NYklPmC7aA0YnTlkU
> rkpkoAyhoGi0ohvfxpDg6cG36M8KU5YbbeT3Tq4MQwFx9Fwn9wf/Kpw0KLewEMil
> NqNWYDwxBPlEDkRxjGjMn9S9oQPqmPE3orQL6Va0OGJKoj1ezAruSP+IG+NibKpG
> SFbIdzg5L9fHINaGudd03Oir4QY76hTaPnErLOKeGmoLVyS7dij6Nw/ol1DV3pnl
> 56kTjPfgC4o9k0Vh0Ym4
> =SlO6
> -----END PGP SIGNATURE-----
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to