kirktrue commented on code in PR #15691:
URL: https://github.com/apache/kafka/pull/15691#discussion_r1560342179


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -326,7 +327,17 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, 
V> deserializers,
                     key, value, headers, leaderEpoch);
         } catch (RuntimeException e) {
             log.error("Deserializers with error: {}", deserializers);
-            throw new RecordDeserializationException(partition, 
record.offset(),
+            ByteBuffer keyBytes = record.key();
+            byte[] key = 
org.apache.kafka.common.utils.Utils.toNullableArray(keyBytes);
+            ByteBuffer valueBytes = record.value();
+            byte[] value = Utils.toNullableArray(valueBytes);
+            Headers headers = new RecordHeaders(record.headers());
+            ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),
+                    record.timestamp(), timestampType,
+                    keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
+                    valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
+                    key, value, headers, Optional.empty());

Review Comment:
   Any reason to omit the epoch value? Can we do the same as the happy path?
   
   ```suggestion
               ConsumerRecord<byte[], byte[]> consumerRecord = new 
ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),
                       record.timestamp(), timestampType,
                       keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
                       valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
                       key, value, headers, leaderEpoch);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -326,7 +327,17 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, 
V> deserializers,
                     key, value, headers, leaderEpoch);
         } catch (RuntimeException e) {
             log.error("Deserializers with error: {}", deserializers);
-            throw new RecordDeserializationException(partition, 
record.offset(),
+            ByteBuffer keyBytes = record.key();
+            byte[] key = 
org.apache.kafka.common.utils.Utils.toNullableArray(keyBytes);

Review Comment:
   Minor nit—can we use the unqualified version of `Utils` here as you do a 
couple lines down? That is:
   
   ```suggestion
               byte[] key = Utils.toNullableArray(keyBytes);
   ```



##########
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##########
@@ -16,29 +16,38 @@
  */
 package org.apache.kafka.common.errors;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+
 /**
  *  This exception is raised for any error that occurs while deserializing 
records received by the consumer using 
  *  the configured {@link org.apache.kafka.common.serialization.Deserializer}.
  */
 public class RecordDeserializationException extends SerializationException {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final TopicPartition partition;
-    private final long offset;
+    private final ConsumerRecord<byte[], byte[]> consumerRecord;
 
-    public RecordDeserializationException(TopicPartition partition, long 
offset, String message, Throwable cause) {
+    public RecordDeserializationException(TopicPartition partition,
+                                          ConsumerRecord<byte[], byte[]> 
record,
+                                          String message,
+                                          Throwable cause) {

Review Comment:
   IIUC, @mjsax mentioned on the mailing list that we need to keep the existing 
constructor signature as is and add an overloaded version that accepts the 
`ConsumerRecord`. Although I'm not sure why... đŸ¤”



##########
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##########
@@ -16,29 +16,38 @@
  */
 package org.apache.kafka.common.errors;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+

Review Comment:
   In general, we try to avoid unnecessary whitespace changes.
   
   ```suggestion
   ```



##########
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##########
@@ -16,29 +16,38 @@
  */
 package org.apache.kafka.common.errors;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+
 /**
  *  This exception is raised for any error that occurs while deserializing 
records received by the consumer using 
  *  the configured {@link org.apache.kafka.common.serialization.Deserializer}.
  */
 public class RecordDeserializationException extends SerializationException {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final TopicPartition partition;
-    private final long offset;
+    private final ConsumerRecord<byte[], byte[]> consumerRecord;
 
-    public RecordDeserializationException(TopicPartition partition, long 
offset, String message, Throwable cause) {
+    public RecordDeserializationException(TopicPartition partition,
+                                          ConsumerRecord<byte[], byte[]> 
record,
+                                          String message,
+                                          Throwable cause) {
         super(message, cause);
         this.partition = partition;
-        this.offset = offset;
+        this.consumerRecord = record;
     }
 
     public TopicPartition topicPartition() {
         return partition;
     }
 
     public long offset() {
-        return offset;
+        return consumerRecord.offset();
+    }
+
+    public ConsumerRecord<byte[], byte[]> getConsumerRecord() {

Review Comment:
   ```suggestion
       public ConsumerRecord<byte[], byte[]> consumerRecord() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to