[ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022828#comment-16022828
 ] 

Eno Thereska commented on KAFKA-4740:
-------------------------------------

I mentioned in KAFKA-5211 that such a change should be done with a KIP. 

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4740
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4740
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>         Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>            Reporter: Sébastien Launay
>            Assignee: Sébastien Launay
>            Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer<String, Integer> kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
>     kafkaConsumer.subscribe(Arrays.asList("topic"));
>     // Will run till the shutdown hook is called
>     while (!doStop) {
>         try {
>             ConsumerRecords<String, Integer> records = 
> kafkaConsumer.poll(1000);
>             if (!records.isEmpty()) {
>                 logger.info("Got {} messages", records.count());
>                 for (ConsumerRecord<String, Integer> record : records) {
>                     logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
>                     record.partition(), record.offset(), record.key(), 
> record.value());
>                 }
>             } else {
>                     logger.info("No messages to consume");
>             }
>         } catch (SerializationException e) {
>             logger.warn("Failed polling some records", e);
>         }
>      }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
>     printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
>     printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
>     printf "\x00\x00\x00\n"     | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
>     printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> ...
> {noformat}
> I don't believe committing offsets would help and even if it did this could 
> potentially result in a few well formed records not being consumed from that 
> {{ConsumerRecords}} batch (data loss).
> I have only seen a few mentions of this bug online \[3\] but I believe this 
> is a critical issue as the new consumer API is not in beta anymore yet if you 
> do not control producers (that can inject malformed values) or you use some 
> advanced deserializer that throws such exception (e.g. schema-registry 
> {{KafkaAvroDeserializer}}) then you can end up blocking a consumer from 
> advancing in the stream.
> Current workarounds:
> - use a {{Deserializer}} that do not throw a {{SerializationException}} (e.g. 
>  {{ByteArrayDeserializer}}, {{StringDeserializer}})
> - wrap the {{Deserializer}} to catch and log the {{SerializationException}} 
> but return {{null}} and then check for {{null}} in the client code (that's 
> what we use on top of {{KafkaAvroDeserializer}} in case there is an issue 
> reaching the schema registry or the Avro datum is either invalid or not 
> compatible with the reader's schema for some reason)
> Potential solutions:
> # continue to throw {{SerializationException}} when calling 
> {{Consumer#poll(long)}} but skip that malformed record on next 
> {{Consumer#poll(long)}}
> # do not throw {{SerializationException}} when calling 
> {{Consumer#poll(long)}} but expose information about invalid records in 
> {{ConsumerRecords}}
> # do not throw {{SerializationException}} when calling 
> {{Consumer#poll(long)}} but store the exception(s) in the {{ConsumerRecord}} 
> object record so that it is rethrown on  {{ConsumerRecord#key()}} and 
> {{ConsumerRecord#value()}}
> # do not deserialize records during {{Consumer#poll()}} but do it when 
> calling {{ConsumerRecord#key()}} and {{ConsumerRecord#value()}} (similar to 
> the old consumer)
> I believe any of those solutions breaks compatibility semantic wise but not 
> necessary binary compatibility as the {{SerializationException}} is a 
> {{RuntimeException}} so it could be "moved around".
> My preference goes to the two last ones and I would be happy to contribute 
> such a change as well as update the documentation on 
> {{SerializationException}} to reflect that it is not only used for 
> serializing records.
> \[1\] 
> https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/message/MessageAndMetadata.scala
> \[1\] 
> http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-formatter.html#serializer
> \[2\] 
> https://github.com/slaunay/kafka-consumer-serialization-exception-example
> \[3\] https://groups.google.com/forum/#!topic/kafka-clients/KBSPmY69H94



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to