Seweryn Habdank-Wojewodzki created KAFKA-5779:
-------------------------------------------------
Summary: Single message may exploit application based on KStream
Key: KAFKA-5779
URL: https://issues.apache.org/jira/browse/KAFKA-5779
Project: Kafka
Issue Type: Bug
Affects Versions: 0.11.0.0, 0.10.2.1
Reporter: Seweryn Habdank-Wojewodzki
Priority: Critical
The context: in Kafka streamming I am *defining* simple KStream processing:
{code}
stringInput // line 54 of the SingleTopicStreamer class
.filter( streamFilter::passOrFilterMessages )
.map( normalizer )
.to( outTopicName );
{code}
For some reasons I got wrong message (I am still investigating what is the
problem),
but anyhow my services was exploited with FATAL error:
{code}
2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled exception:
Input record ConsumerRecord(topic = XXX_topic, partition = 8, offset = 15,
CreateTime = -1, serialized key size = -1, serialized value size = 255, headers
= RecordHeaders(headers = [], isReadOnly = false), key = null, value =
{"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
has invalid (negative) timestamp. Possibly because a pre-0.10 producer client
was used to write this record to Kafka without embedding a timestamp, or
because the input topic was created before upgrading the Kafka cluster to
0.10+. Use a different TimestampExtractor to process this data.;
[org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464),
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
{code}
The possible reason about using old producer in message is false, as we are
using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this
version of Kafka.
The sender application is .NET client from Confluent.
All the matter is a bit problematic with this exception, as it was suggested it
is thrown in scope of initialization of the stream, but effectively it happend
in processing, so adding try{} catch {} around stringInput statement does not
help, as stream was correctly defined, but only one message send later had
exploited all the app.
In my opinion KStream shall be robust enough to catch all such a exception and
shall protect application from death due to single corrupted message.
Especially when timestamp is not embedded. In such a case one can patch message
with current timestamp without loss of overall performance.
I would expect Kafka Stream will handle this.
I will continue to investigate, what is the problem with the message, but it is
quite hard to me, as it happens internally in Kafka stream combined with .NET
producer.
And I had already tested, that this problem does not occur when I got these
concrete messages in old-fashioned Kafka Consumer :-).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)