rhauch commented on a change in pull request #7496: URL: https://github.com/apache/kafka/pull/7496#discussion_r438497632
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ########## @@ -507,6 +508,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[] return transformationChain.apply(origRecord); } + private SchemaAndValue convertKeyValue(ConsumerRecord<byte[], byte[]> msg, boolean isKey) { + try { + byte[] value = isKey ? msg.key() : msg.value(); + Converter converter = isKey ? keyConverter : valueConverter; + return converter.toConnectData(msg.topic(), msg.headers(), value); + } catch (Exception e) { + log.error("Error converting message {} in topic '{}' partition {} at offset {}", + isKey ? ConverterType.KEY.getName() : ConverterType.VALUE.getName(), msg.topic(), msg.partition(), msg.offset()); + throw e; + } + } + Review comment: Since the calling code already knows whether it's a key or value, how about just having separate methods? Yeah, they'd be mostly the same, but we could avoid the superfluous logic and could simplify things a bit. Also, would it be better to wrap the exception rather than just log the error? Especially with the retry operator, it's possible that the error won't get logged near this log message, so we'd lose the correlation. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org