Chia-Ping Tsai created KAFKA-7036:
-------------------------------------
Summary: Complete the docs of KafkaConsumer#poll
Key: KAFKA-7036
URL: https://issues.apache.org/jira/browse/KAFKA-7036
Project: Kafka
Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai
KafkaConsumer#poll has a nice docs about the expected exceptions. However, it
lacks the description of SerializationException. Another mirror issue is that
KafkaConsumer doesn't catch all type of exception which may be thrown by
deserializer (see below). We should use Throwable to replace the
RuntimeException so as to catch all exception and then wrap them to
SerializationException.
{code:java}
private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
RecordBatch batch,
Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
TimestampType timestampType = batch.timestampType();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
K key = keyBytes == null ? null :
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null :
Utils.toArray(valueBytes);
V value = valueBytes == null ? null :
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(),
offset,
timestamp, timestampType,
record.checksumOrNull(),
keyByteArray == null ?
ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ?
ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for
partition " + partition +
" at offset " + record.offset() + ". If needed, please seek
past the record to continue consumption.", e);
}
}{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)