fvaleri commented on code in PR #15691: URL: https://github.com/apache/kafka/pull/15691#discussion_r1589379241
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ########## @@ -311,25 +311,35 @@ <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers, Optional<Integer> leaderEpoch, TimestampType timestampType, Record record) { + long offset = record.offset(); + long timestamp = record.timestamp(); + ByteBuffer keyBytes = record.key(); + ByteBuffer valueBytes = record.value(); + Headers headers = new RecordHeaders(record.headers()); + K key; + V value; try { - long offset = record.offset(); - long timestamp = record.timestamp(); - Headers headers = new RecordHeaders(record.headers()); - ByteBuffer keyBytes = record.key(); - K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); - ByteBuffer valueBytes = record.value(); - V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes); - return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, - timestamp, timestampType, - keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(), - valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(), - key, value, headers, leaderEpoch); + key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes); } catch (RuntimeException e) { - log.error("Deserializers with error: {}", deserializers); - throw new RecordDeserializationException(partition, record.offset(), - "Error deserializing key/value for partition " + partition + + throw new RecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, Review Comment: What about using a helper method like this to throw the exception? ```sh private void throwRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin origin, TopicPartition partition, TimestampType timestampType, Record record, RuntimeException e) { throw new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), new RecordHeaders(record.headers()), "Error deserializing " + origin.toString() + " for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org