[
https://issues.apache.org/jira/browse/KAFKA-7036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chia-Ping Tsai resolved KAFKA-7036.
-----------------------------------
Resolution: Won't Fix
> Complete the docs of KafkaConsumer#poll
> ---------------------------------------
>
> Key: KAFKA-7036
> URL: https://issues.apache.org/jira/browse/KAFKA-7036
> Project: Kafka
> Issue Type: Improvement
> Reporter: Chia-Ping Tsai
> Assignee: Chia-Ping Tsai
> Priority: Minor
>
> KafkaConsumer#poll has a nice docs about the expected exceptions. However, it
> lacks the description of SerializationException. Another mirror issue is that
> KafkaConsumer doesn't catch all type of exception which may be thrown by
> deserializer (see below). We should use Throwable to replace the
> RuntimeException so as to catch all exception and then wrap them to
> SerializationException.
> {code:java}
> private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
> RecordBatch batch,
> Record record) {
> try {
> long offset = record.offset();
> long timestamp = record.timestamp();
> TimestampType timestampType = batch.timestampType();
> Headers headers = new RecordHeaders(record.headers());
> ByteBuffer keyBytes = record.key();
> byte[] keyByteArray = keyBytes == null ? null :
> Utils.toArray(keyBytes);
> K key = keyBytes == null ? null :
> this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
> ByteBuffer valueBytes = record.value();
> byte[] valueByteArray = valueBytes == null ? null :
> Utils.toArray(valueBytes);
> V value = valueBytes == null ? null :
> this.valueDeserializer.deserialize(partition.topic(), headers,
> valueByteArray);
> return new ConsumerRecord<>(partition.topic(), partition.partition(),
> offset,
> timestamp, timestampType,
> record.checksumOrNull(),
> keyByteArray == null ?
> ConsumerRecord.NULL_SIZE : keyByteArray.length,
> valueByteArray == null ?
> ConsumerRecord.NULL_SIZE : valueByteArray.length,
> key, value, headers);
> } catch (RuntimeException e) {
> throw new SerializationException("Error deserializing key/value for
> partition " + partition +
> " at offset " + record.offset() + ". If needed, please seek
> past the record to continue consumption.", e);
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)