AndrewJSchofield commented on code in PR #15691:
URL: https://github.com/apache/kafka/pull/15691#discussion_r1572845716


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -311,25 +312,33 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, 
V> deserializers,
                                             Optional<Integer> leaderEpoch,
                                             TimestampType timestampType,
                                             Record record) {
+        long offset = record.offset();
+        long timestamp = record.timestamp();
+        ByteBuffer keyBytes = record.key();
+        ByteBuffer valueBytes = record.value();
+        Headers headers = new RecordHeaders(record.headers());
+        K key;
+        V value;
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            K key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
-            ByteBuffer valueBytes = record.value();
-            V value = valueBytes == null ? null : 
deserializers.valueDeserializer.deserialize(partition.topic(), headers, 
valueBytes);
-            return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
-                    timestamp, timestampType,
-                    keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
-                    valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
-                    key, value, headers, leaderEpoch);
+            key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
         } catch (RuntimeException e) {
-            log.error("Deserializers with error: {}", deserializers);
-            throw new RecordDeserializationException(partition, 
record.offset(),
-                    "Error deserializing key/value for partition " + partition 
+
+            throw new KeyDeserializationException(partition, offset, keyBytes, 
valueBytes, headers, record.timestamp(),

Review Comment:
   I think you need a TimestampType also on the constructor for the 
`RecordDeserializationException`.



##########
clients/src/main/java/org/apache/kafka/common/errors/KeyDeserializationException.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+
+import java.nio.ByteBuffer;
+
+public class KeyDeserializationException extends 
RecordDeserializationException {
+    private final static long serialVersionUID = 1L;
+    public KeyDeserializationException(TopicPartition partition, long offset, 
ByteBuffer key, ByteBuffer value, Headers headers, long timestamp, String 
message, Throwable cause) {

Review Comment:
   Please split this enormously long line :)



##########
clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java:
##########
@@ -17,21 +17,55 @@
 package org.apache.kafka.common.errors;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
 
 /**
  *  This exception is raised for any error that occurs while deserializing 
records received by the consumer using 
  *  the configured {@link org.apache.kafka.common.serialization.Deserializer}.
  */
 public class RecordDeserializationException extends SerializationException {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final TopicPartition partition;
     private final long offset;
+    private final long timestamp;
+    private final ByteBuffer key;
+    private final ByteBuffer value;
+    private final Headers headers;
 
-    public RecordDeserializationException(TopicPartition partition, long 
offset, String message, Throwable cause) {
+    @Deprecated
+    public RecordDeserializationException(TopicPartition partition,
+                                          long offset,
+                                          String message,
+                                          Throwable cause) {
         super(message, cause);
         this.partition = partition;
         this.offset = offset;
+        this.timestamp = 0;
+        this.key = null;
+        this.value = null;
+        this.headers = null;
+    }
+
+    // New constructor
+    protected RecordDeserializationException(TopicPartition partition,
+                                          long offset,
+                                          ByteBuffer key,
+                                          ByteBuffer value,
+                                          Headers headers,
+                                          long timestamp,
+                                          String message,

Review Comment:
   And a TimestampType too so that it is clear how to interpret the timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to