fvaleri commented on code in PR #15691:
URL: https://github.com/apache/kafka/pull/15691#discussion_r1589379241


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -311,25 +311,35 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, 
V> deserializers,
                                             Optional<Integer> leaderEpoch,
                                             TimestampType timestampType,
                                             Record record) {
+        long offset = record.offset();
+        long timestamp = record.timestamp();
+        ByteBuffer keyBytes = record.key();
+        ByteBuffer valueBytes = record.value();
+        Headers headers = new RecordHeaders(record.headers());
+        K key;
+        V value;
         try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            K key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
-            ByteBuffer valueBytes = record.value();
-            V value = valueBytes == null ? null : 
deserializers.valueDeserializer.deserialize(partition.topic(), headers, 
valueBytes);
-            return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
-                    timestamp, timestampType,
-                    keyBytes == null ? ConsumerRecord.NULL_SIZE : 
keyBytes.remaining(),
-                    valueBytes == null ? ConsumerRecord.NULL_SIZE : 
valueBytes.remaining(),
-                    key, value, headers, leaderEpoch);
+            key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
         } catch (RuntimeException e) {
-            log.error("Deserializers with error: {}", deserializers);
-            throw new RecordDeserializationException(partition, 
record.offset(),
-                    "Error deserializing key/value for partition " + partition 
+
+            throw new 
RecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY,

Review Comment:
   What about using a helper method like this to throw the exception?
   
   ```sh
       private void 
throwRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin
 origin,
                                                        TopicPartition 
partition,
                                                        TimestampType 
timestampType,
                                                        Record record,
                                                        RuntimeException e) {
           throw new RecordDeserializationException(origin, partition, 
record.offset(), record.timestamp(), timestampType, record.key(), 
               record.value(), new RecordHeaders(record.headers()), "Error 
deserializing " + origin.toString() + " for partition " 
                   + partition + " at offset " + record.offset() + ". If 
needed, please seek past the record to continue consumption.", e);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to