Seweryn Habdank-Wojewodzki created KAFKA-5779:
-------------------------------------------------

             Summary: Single message may exploit application based on KStream
                 Key: KAFKA-5779
                 URL: https://issues.apache.org/jira/browse/KAFKA-5779
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.11.0.0, 0.10.2.1
            Reporter: Seweryn Habdank-Wojewodzki
            Priority: Critical


The context: in Kafka streamming I am *defining* simple KStream processing:

{code}
    stringInput // line 54 of the SingleTopicStreamer class
        .filter( streamFilter::passOrFilterMessages )
        .map( normalizer )
        .to( outTopicName );
{code}

For some reasons I got wrong message (I am still investigating what is the 
problem), 
but anyhow my services was exploited with FATAL error:

{code}
2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled exception: 
Input record ConsumerRecord(topic = XXX_topic, partition = 8, offset = 15, 
CreateTime = -1, serialized key size = -1, serialized value size = 255, headers 
= RecordHeaders(headers = [], isReadOnly = false), key = null, value = 
{"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
 has invalid (negative) timestamp. Possibly because a pre-0.10 producer client 
was used to write this record to Kafka without embedding a timestamp, or 
because the input topic was created before upgrading the Kafka cluster to 
0.10+. Use a different TimestampExtractor to process this data.; 
[org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
 
org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
 
org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
 
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
 
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
 
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464),
 
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
 in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
{code}

The possible reason about using old producer in message is false, as we are 
using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this 
version of Kafka. 
The sender application is .NET client from Confluent.

All the matter is a bit problematic with this exception, as it was suggested it 
is thrown in scope of initialization of the stream, but effectively it happend 
in processing, so adding try{} catch {} around stringInput statement does not 
help, as stream was correctly defined, but only one message send later had 
exploited all the app.

In my opinion KStream shall be robust enough to catch all such a exception and 
shall protect application from death due to single corrupted message. 
Especially when timestamp is not embedded. In such a case one can patch message 
with current timestamp without loss of overall performance.

I would expect Kafka Stream will handle this.

I will continue to investigate, what is the problem with the message, but it is 
quite hard to me, as it happens internally in Kafka stream combined with .NET 
producer.

And I had already tested, that this problem does not occur when I got these 
concrete messages in old-fashioned Kafka Consumer :-).




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to