[ https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16146794#comment-16146794 ]
Seweryn Habdank-Wojewodzki commented on KAFKA-5779: --------------------------------------------------- OK. I will wait for 1.0 release. But anyhow can you tell me what API may I use now to substitute default TimeExtractor? I was searching but without success. I would like to get a feeling how system will behave, before 1.0 release appears. BTW Thanks for constructive comments. > Single message may exploit application based on KStream > ------------------------------------------------------- > > Key: KAFKA-5779 > URL: https://issues.apache.org/jira/browse/KAFKA-5779 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.2.1, 0.11.0.0 > Reporter: Seweryn Habdank-Wojewodzki > Priority: Critical > > The context: in Kafka streamming I am *defining* simple KStream processing: > {code} > stringInput // line 54 of the SingleTopicStreamer class > .filter( streamFilter::passOrFilterMessages ) > .map( normalizer ) > .to( outTopicName ); > {code} > For some reasons I got wrong message (I am still investigating what is the > problem), > but anyhow my services was exploited with FATAL error: > {code} > 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled > exception: Input record ConsumerRecord(topic = XXX_topic, partition = 8, > offset = 15, CreateTime = -1, serialized key size = -1, serialized value size > = 255, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, > value = > {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"}) > has invalid (negative) timestamp. Possibly because a pre-0.10 producer > client was used to write this record to Kafka without embedding a timestamp, > or because the input topic was created before upgrading the Kafka cluster to > 0.10+. Use a different TimestampExtractor to process this data.; > [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63), > > org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61), > > org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46), > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85), > > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117), > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464), > > org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650), > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556), > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)] > in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3 > {code} > The possible reason about using old producer in message is false, as we are > using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this > version of Kafka. > The sender application is .NET client from Confluent. > All the matter is a bit problematic with this exception, as it was suggested > it is thrown in scope of initialization of the stream, but effectively it > happend in processing, so adding try{} catch {} around stringInput statement > does not help, as stream was correctly defined, but only one message send > later had exploited all the app. > In my opinion KStream shall be robust enough to catch all such a exception > and shall protect application from death due to single corrupted message. > Especially when timestamp is not embedded. In such a case one can patch > message with current timestamp without loss of overall performance. > I would expect Kafka Stream will handle this. > I will continue to investigate, what is the problem with the message, but it > is quite hard to me, as it happens internally in Kafka stream combined with > .NET producer. > And I had already tested, that this problem does not occur when I got these > concrete messages in old-fashioned Kafka Consumer :-). -- This message was sent by Atlassian JIRA (v6.4.14#64029)