kirktrue commented on code in PR #15691: URL: https://github.com/apache/kafka/pull/15691#discussion_r1560342179
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ########## @@ -326,7 +327,17 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers, key, value, headers, leaderEpoch); } catch (RuntimeException e) { log.error("Deserializers with error: {}", deserializers); - throw new RecordDeserializationException(partition, record.offset(), + ByteBuffer keyBytes = record.key(); + byte[] key = org.apache.kafka.common.utils.Utils.toNullableArray(keyBytes); + ByteBuffer valueBytes = record.value(); + byte[] value = Utils.toNullableArray(valueBytes); + Headers headers = new RecordHeaders(record.headers()); + ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(), + record.timestamp(), timestampType, + keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), + valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), + key, value, headers, Optional.empty()); Review Comment: Any reason to omit the epoch value? Can we do the same as the happy path? ```suggestion ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(), record.timestamp(), timestampType, keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), key, value, headers, leaderEpoch); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ########## @@ -326,7 +327,17 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers, key, value, headers, leaderEpoch); } catch (RuntimeException e) { log.error("Deserializers with error: {}", deserializers); - throw new RecordDeserializationException(partition, record.offset(), + ByteBuffer keyBytes = record.key(); + byte[] key = org.apache.kafka.common.utils.Utils.toNullableArray(keyBytes); Review Comment: Minor nit—can we use the unqualified version of `Utils` here as you do a couple lines down? That is: ```suggestion byte[] key = Utils.toNullableArray(keyBytes); ``` ########## clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java: ########## @@ -16,29 +16,38 @@ */ package org.apache.kafka.common.errors; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; + /** * This exception is raised for any error that occurs while deserializing records received by the consumer using * the configured {@link org.apache.kafka.common.serialization.Deserializer}. */ public class RecordDeserializationException extends SerializationException { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final TopicPartition partition; - private final long offset; + private final ConsumerRecord<byte[], byte[]> consumerRecord; - public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) { + public RecordDeserializationException(TopicPartition partition, + ConsumerRecord<byte[], byte[]> record, + String message, + Throwable cause) { Review Comment: IIUC, @mjsax mentioned on the mailing list that we need to keep the existing constructor signature as is and add an overloaded version that accepts the `ConsumerRecord`. Although I'm not sure why... đŸ¤” ########## clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java: ########## @@ -16,29 +16,38 @@ */ package org.apache.kafka.common.errors; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; + Review Comment: In general, we try to avoid unnecessary whitespace changes. ```suggestion ``` ########## clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java: ########## @@ -16,29 +16,38 @@ */ package org.apache.kafka.common.errors; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; + /** * This exception is raised for any error that occurs while deserializing records received by the consumer using * the configured {@link org.apache.kafka.common.serialization.Deserializer}. */ public class RecordDeserializationException extends SerializationException { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final TopicPartition partition; - private final long offset; + private final ConsumerRecord<byte[], byte[]> consumerRecord; - public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) { + public RecordDeserializationException(TopicPartition partition, + ConsumerRecord<byte[], byte[]> record, + String message, + Throwable cause) { super(message, cause); this.partition = partition; - this.offset = offset; + this.consumerRecord = record; } public TopicPartition topicPartition() { return partition; } public long offset() { - return offset; + return consumerRecord.offset(); + } + + public ConsumerRecord<byte[], byte[]> getConsumerRecord() { Review Comment: ```suggestion public ConsumerRecord<byte[], byte[]> consumerRecord() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org