[ https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022828#comment-16022828 ]
Eno Thereska commented on KAFKA-4740: ------------------------------------- I mentioned in KAFKA-5211 that such a change should be done with a KIP. > Using new consumer API with a Deserializer that throws SerializationException > can lead to infinite loop > ------------------------------------------------------------------------------------------------------- > > Key: KAFKA-4740 > URL: https://issues.apache.org/jira/browse/KAFKA-4740 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1 > Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on > the broker version) > Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1 > Reporter: Sébastien Launay > Assignee: Sébastien Launay > Priority: Critical > > The old consumer supports deserializing records into typed objects and throws > a {{SerializationException}} through {{MessageAndMetadata#key()}} and > {{MessageAndMetadata#message()}} that can be catched by the client \[1\]. > When using the new consumer API with kafka-clients version < 0.10.0.1, such > the exception is swallowed by the {{NetworkClient}} class and result in an > infinite loop which the client has no control over like: > {noformat} > DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset > for partition test2-0 to earliest offset. > DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 > for partition test2-0 > ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request > completion: > org.apache.kafka.common.errors.SerializationException: Size of data received > by IntegerDeserializer is not 4 > ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request > completion: > org.apache.kafka.common.errors.SerializationException: Size of data received > by IntegerDeserializer is not 4 > ... > {noformat} > Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another > issue still remains. > Indeed, the client can now catch the {{SerializationException}} but the next > call to {{Consumer#poll(long)}} will throw the same exception indefinitely. > The following snippet (full example available on Github \[2\] for most > released kafka-clients versions): > {code:java} > try (KafkaConsumer<String, Integer> kafkaConsumer = new > KafkaConsumer<>(consumerConfig, new StringDeserializer(), new > IntegerDeserializer())) { > kafkaConsumer.subscribe(Arrays.asList("topic")); > // Will run till the shutdown hook is called > while (!doStop) { > try { > ConsumerRecords<String, Integer> records = > kafkaConsumer.poll(1000); > if (!records.isEmpty()) { > logger.info("Got {} messages", records.count()); > for (ConsumerRecord<String, Integer> record : records) { > logger.info("Message with partition: {}, offset: {}, key: > {}, value: {}", > record.partition(), record.offset(), record.key(), > record.value()); > } > } else { > logger.info("No messages to consume"); > } > } catch (SerializationException e) { > logger.warn("Failed polling some records", e); > } > } > } > {code} > when run with the following records (third record has an invalid Integer > value): > {noformat} > printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list > localhost:9092 --topic topic > {noformat} > will produce the following logs: > {noformat} > INFO consumer.Consumer - Got 2 messages > INFO consumer.Consumer - Message with partition: 0, offset: 0, key: null, > value: 0 > INFO consumer.Consumer - Message with partition: 0, offset: 1, key: null, > value: 1 > WARN consumer.Consumer - Failed polling some records > org.apache.kafka.common.errors.SerializationException: Error deserializing > key/value for partition topic-0 at offset 2 > Caused by: org.apache.kafka.common.errors.SerializationException: Size of > data received by IntegerDeserializer is not 4 > WARN consumer.Consumer - Failed polling some records > org.apache.kafka.common.errors.SerializationException: Error deserializing > key/value for partition topic-0 at offset 2 > Caused by: org.apache.kafka.common.errors.SerializationException: Size of > data received by IntegerDeserializer is not 4 > WARN consumer.Consumer - Failed polling some records > org.apache.kafka.common.errors.SerializationException: Error deserializing > key/value for partition topic-0 at offset 2 > Caused by: org.apache.kafka.common.errors.SerializationException: Size of > data received by IntegerDeserializer is not 4 > ... > {noformat} > I don't believe committing offsets would help and even if it did this could > potentially result in a few well formed records not being consumed from that > {{ConsumerRecords}} batch (data loss). > I have only seen a few mentions of this bug online \[3\] but I believe this > is a critical issue as the new consumer API is not in beta anymore yet if you > do not control producers (that can inject malformed values) or you use some > advanced deserializer that throws such exception (e.g. schema-registry > {{KafkaAvroDeserializer}}) then you can end up blocking a consumer from > advancing in the stream. > Current workarounds: > - use a {{Deserializer}} that do not throw a {{SerializationException}} (e.g. > {{ByteArrayDeserializer}}, {{StringDeserializer}}) > - wrap the {{Deserializer}} to catch and log the {{SerializationException}} > but return {{null}} and then check for {{null}} in the client code (that's > what we use on top of {{KafkaAvroDeserializer}} in case there is an issue > reaching the schema registry or the Avro datum is either invalid or not > compatible with the reader's schema for some reason) > Potential solutions: > # continue to throw {{SerializationException}} when calling > {{Consumer#poll(long)}} but skip that malformed record on next > {{Consumer#poll(long)}} > # do not throw {{SerializationException}} when calling > {{Consumer#poll(long)}} but expose information about invalid records in > {{ConsumerRecords}} > # do not throw {{SerializationException}} when calling > {{Consumer#poll(long)}} but store the exception(s) in the {{ConsumerRecord}} > object record so that it is rethrown on {{ConsumerRecord#key()}} and > {{ConsumerRecord#value()}} > # do not deserialize records during {{Consumer#poll()}} but do it when > calling {{ConsumerRecord#key()}} and {{ConsumerRecord#value()}} (similar to > the old consumer) > I believe any of those solutions breaks compatibility semantic wise but not > necessary binary compatibility as the {{SerializationException}} is a > {{RuntimeException}} so it could be "moved around". > My preference goes to the two last ones and I would be happy to contribute > such a change as well as update the documentation on > {{SerializationException}} to reflect that it is not only used for > serializing records. > \[1\] > https://github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/message/MessageAndMetadata.scala > \[1\] > http://docs.confluent.io/2.0.0/schema-registry/docs/serializer-formatter.html#serializer > \[2\] > https://github.com/slaunay/kafka-consumer-serialization-exception-example > \[3\] https://groups.google.com/forum/#!topic/kafka-clients/KBSPmY69H94 -- This message was sent by Atlassian JIRA (v6.3.15#6346)