-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hey,

we just added a new FAQ entry for upcoming CP 3.2 release that answers
your question. I just c&p it here. More concrete answer below.

> If you get an exception similar to the one shown below, there are
> multiple possible causes:
> 
> Exception in thread "StreamThread-1"
> java.lang.IllegalArgumentException: Invalid timestamp -1 at
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord
.java:60)
>
>  This error means that the timestamp extractor of your Kafka
> Streams application failed to extract a valid timestamp from a
> record. Typically, this points to a problem with the record (e.g.,
> the record does not contain a timestamp at all), but it could also
> indicate a problem or bug in the timestamp extractor used by the
> application.
> 
> When does a record not contain a valid timestamp:
> 
> If you are using the default ConsumerRecordTimestampExtractor, it
> is most likely that your records do not carry an embedded
> timestamp (embedded record timestamps got introduced in Kafka's
> message format in Kafka 0.10). This might happen, if you consume a
> topic that is written by old Kafka producer clients (ie, version
> 0.9 or earlier) or third party producer clients. A common situation
> where this may happen is after upgrading your Kafka cluster from
> 0.9 to 0.10, where all the data that was generated with 0.9 is not
> compatible with the 0.10 message format. If you are using a custom
> timestamp extractor, make sure that your extractor is robust to
> missing timestamps in your records. For example, you can return a
> default or estimated timestamp if you cannot extract a valid
> timestamp (maybe the timstamp field in your data is just missing). 
> You can also switch to processing time semantics via
> WallclockTimestampExtractor; whether such a fallback is an
> appropriate response to this situation depends on your use case. 
> However, as a first step you should identify and fix the root cause
> for why such problematic records were written to Kafka in the first
> place. In a second step you may consider applying workarounds (as
> described above) when dealing with such records (for example, if
> you need to process those records after all). Another option is to
> regenerate the records with correct timestamps and write them to a
> new Kafka topic.
> 
> When the timestamp extractor causes the problem:
> 
> In this situation you should debug and fix the erroneous extractor.
> If the extractor is built into Kafka, please report the bug to the
> Kafka developer mailing list at d...@kafka.apache.org (see
> instructions http://kafka.apache.org/contact); in the meantime, you
> may write a custom timestamp extractor that fixes the problem and
> configure your application to use that extractor for the time
> being.


To address you questions more concretely:

  1. Yes an no: Yes, for any new data you write to you topic. No, for
any already written data that does not have a valid timestamp set
  2. Default is creating time
  3. Config parameter "message.timestamp.type")
     It's a broker side per topic setting
     (however, be aware that Java KafkaProducer does verify the
timestamp locally before sending the message to the broker, thus on -1
there will be the client side exception you did observe(
  4. I assume that you do consumer different topic with different TS
fields in you records.

Also have a look at:
http://docs.confluent.io/current/streams/concepts.html#time



- -Matthias


On 10/28/16 5:42 AM, Debasish Ghosh wrote:
> I am actually using 0.10.0 and NOT 0.10.1 as I mentioned in the
> last mail. And I am using Kafka within a DC/OS cluster under AWS.
> 
> The version that I mentioned works ok is on my local machine using
> a local Kafka installation. And it works for both single broker and
> multi broker scenario.
> 
> Thanks.
> 
> On Fri, Oct 28, 2016 at 1:37 PM, Debasish Ghosh
> <ghosh.debas...@gmail.com> wrote:
> 
>> Hello -
>> 
>> I am a beginner in Kafka .. with my first Kafka streams
>> application ..
>> 
>> I have a streams application that reads from a topic, does some 
>> transformation on the data and writes to another topic. The
>> record that I manipulate is a CSV record.
>> 
>> It runs fine when I run it on a local Kafka instance.
>> 
>> However when I run it on an AWS cluster, I get the following
>> exception when I try to produce the transformed record into the
>> target topic.
>> 
>> Exception in thread "StreamThread-1"
>> java.lang.IllegalArgumentException: Invalid timestamp -1 at
>> org.apache.kafka.clients.producer.ProducerRecord.<init> 
>> (ProducerRecord.java:60) at
>> org.apache.kafka.streams.processor.internals.SinkNode. 
>> process(SinkNode.java:72) at
>> org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:338) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ 
>> KStreamMapProcessor.process(KStreamMapValues.java:42) at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>
>> 
ProcessorNode.java:68)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:338) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
>> org.apache.kafka.streams.kstream.internals.KStreamPassThrough$ 
>> KStreamPassThroughProcessor.process(KStreamPassThrough.java:34) 
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>
>> 
ProcessorNode.java:68)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:351) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:192) at
>> org.apache.kafka.streams.kstream.internals.KStreamBranch$ 
>> KStreamBranchProcessor.process(KStreamBranch.java:46) at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>
>> 
ProcessorNode.java:68)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:338) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$ 
>> KStreamMapProcessor.process(KStreamMapValues.java:42) at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>
>> 
ProcessorNode.java:68)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamTask.forward(StreamTask.java:338) at
>> org.apache.kafka.streams.processor.internals. 
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:187) at
>> org.apache.kafka.streams.processor.internals. 
>> SourceNode.process(SourceNode.java:64) at
>> org.apache.kafka.streams.processor.internals. 
>> StreamTask.process(StreamTask.java:174) at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>
>> 
StreamThread.java:320)
>> at org.apache.kafka.streams.processor.internals. 
>> StreamThread.run(StreamThread.java:218)
>> 
>> Looks like the timestamp passed to the ProducerRecord is -1,
>> though I am not passing any timestamp explicitly. I am not sure
>> why this happens. But I see that the Javadoc for ProducerRecord
>> says the following ..
>> 
>> The record also has an associated timestamp. If the user did not
>> provide a
>>> timestamp, the producer will stamp the record with its current
>>> time. The timestamp eventually used by Kafka depends on the
>>> timestamp type configured for the topic. If the topic is
>>> configured to use CreateTime, the timestamp in the producer
>>> record will be used by the broker. If the topic is configured
>>> to use LogAppendTime, the timestamp in the producer record will
>>> be overwritten by the broker with the broker local time when it
>>> appends the message to its log. In either of the cases above,
>>> the timestamp that has actually been used will be returned to
>>> user in RecordMetadata
>> 
>> 
>> 1. Will this problem be solved if I configure the topic with 
>> LogAppendTime or CreateTime explicitly ? 2. What is the default
>> setting of this property in a newly created topic ? 3. How do I
>> change it (what is the name of the property to be set) ? 4. Any
>> idea why I face this problem in the cluster mode but not in the 
>> local mode ?
>> 
>> BTW I am using 0.10.1.
>> 
>> Any help / pointer will be appreciated ?
>> 
>> regards.
>> 
>> -- Debasish Ghosh http://manning.com/ghosh2 
>> http://manning.com/ghosh
>> 
>> Twttr: @debasishg Blog: http://debasishg.blogspot.com Code:
>> http://github.com/debasishg
>> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYE8jGAAoJECnhiMLycopPZmYQAIJ+wCZb8l1dJNDxbpWvnopz
pRB/YKoXLUA/BxQ/z8PZR8gd/0evQ92Kwt/7ZuS1no7o1CNixptZ81ycoxN7pj4q
QuVN/D9QDJvCDUdhDHFT50dwvHehgoD7oj2MLOlwCH1CWjd8REyRP+8gQDJ/6jko
frQUXK5hpT3QDl2F3kvS0JL40SDSOxAIxxFL0EH4midCXHhn3KR/XmdFSRm8Gmnp
jZ6+FpTEO3ntav0uhaA5zPwoMAIyc/Jcx6rFrzGBcnPPE0LkYCvPhmmN44OCe7N6
mR/y13IAVUfaOmQqdXr8lkLVWdSxFKX37wASie43jripHygFxOAYn5NtHqan4OvK
EVjQDL87fZ+938ML97CGtiTdhLX65WBvd9IWgScI6vHZ84AgaeWvLLdZeHNWDboF
NJPWqk9VYFObre8pGHrFEha7RPz0fBqWP8uTc1pEZDIll44NYklPmC7aA0YnTlkU
rkpkoAyhoGi0ohvfxpDg6cG36M8KU5YbbeT3Tq4MQwFx9Fwn9wf/Kpw0KLewEMil
NqNWYDwxBPlEDkRxjGjMn9S9oQPqmPE3orQL6Va0OGJKoj1ezAruSP+IG+NibKpG
SFbIdzg5L9fHINaGudd03Oir4QY76hTaPnErLOKeGmoLVyS7dij6Nw/ol1DV3pnl
56kTjPfgC4o9k0Vh0Ym4
=SlO6
-----END PGP SIGNATURE-----

Reply via email to