[ 
https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16146087#comment-16146087
 ] 

Matthias J. Sax commented on KAFKA-5779:
----------------------------------------

I understand your point of view (it's absolutely valid). We did have a 
discussion about default behavior on the dev mailing list and community decided 
that default behavior should be "fail fast". I guess it's a matter of personal 
taste if "fail fast" is best default behavior or not. Note, that dropping 
records on the floor might result to data loss and it's important for some 
application to be informed about this immediately by throwing an exception 
instead of silently dropping records. 

About timestamp extractor and record queue: the if-condition is only "dead 
code" if you run with default timestamp extractor ({{FailOnInvalidTimestamp}}). 
If you change the timestamp extractor such that it does not throw an exception, 
you can skip over records with invalid timestamp (ie, if the extractor return a 
negative timestamp the if-condition hits). I guess, this is the behavior you 
want, so I would recommend to set {{LogAndSkipOnInvalidTimestamp}} via 
{{StreamsConfig}} for your application. Community did choose to fail by default 
as explained above. But you can simple reconfigure Kafka Streams to change 
default behavior to your needs.

For parsing errors, we added a new feature (coming in 1.0 relaese) that allows 
you to configure Kafka Streams to drop those records on the floor instead of 
failing (ie, it a similar behavior as for {{LogAndSkipOnInvalidTimestamp}} but 
this time for parsing/deserializing the key-value pair): 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers
 If you cannot wait for 1.0 release a workaround to make your application 
resilient against parsing error is described here: 
https://stackoverflow.com/questions/42666756/handling-bad-messages-using-kafkas-streams-api/42680420#42680420

I am closing this JIRA as "not a problem" for now. Feel free to follow up with 
the discussion! Happy to explain more about it.



> Single message may exploit application based on KStream
> -------------------------------------------------------
>
>                 Key: KAFKA-5779
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5779
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.2.1, 0.11.0.0
>            Reporter: Seweryn Habdank-Wojewodzki
>            Priority: Critical
>
> The context: in Kafka streamming I am *defining* simple KStream processing:
> {code}
>     stringInput // line 54 of the SingleTopicStreamer class
>         .filter( streamFilter::passOrFilterMessages )
>         .map( normalizer )
>         .to( outTopicName );
> {code}
> For some reasons I got wrong message (I am still investigating what is the 
> problem), 
> but anyhow my services was exploited with FATAL error:
> {code}
> 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled 
> exception: Input record ConsumerRecord(topic = XXX_topic, partition = 8, 
> offset = 15, CreateTime = -1, serialized key size = -1, serialized value size 
> = 255, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, 
> value = 
> {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
>  has invalid (negative) timestamp. Possibly because a pre-0.10 producer 
> client was used to write this record to Kafka without embedding a timestamp, 
> or because the input topic was created before upgrading the Kafka cluster to 
> 0.10+. Use a different TimestampExtractor to process this data.; 
> [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
>  
> org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
>  
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
>  
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
>  
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
>  
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
>  in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
> {code}
> The possible reason about using old producer in message is false, as we are 
> using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this 
> version of Kafka. 
> The sender application is .NET client from Confluent.
> All the matter is a bit problematic with this exception, as it was suggested 
> it is thrown in scope of initialization of the stream, but effectively it 
> happend in processing, so adding try{} catch {} around stringInput statement 
> does not help, as stream was correctly defined, but only one message send 
> later had exploited all the app.
> In my opinion KStream shall be robust enough to catch all such a exception 
> and shall protect application from death due to single corrupted message. 
> Especially when timestamp is not embedded. In such a case one can patch 
> message with current timestamp without loss of overall performance.
> I would expect Kafka Stream will handle this.
> I will continue to investigate, what is the problem with the message, but it 
> is quite hard to me, as it happens internally in Kafka stream combined with 
> .NET producer.
> And I had already tested, that this problem does not occur when I got these 
> concrete messages in old-fashioned Kafka Consumer :-).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to