gharris1727 commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449513344
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java: ########## @@ -168,11 +102,13 @@ public String toString(boolean includeMessage) { builder.append("' with class '"); builder.append(executingClass() == null ? "null" : executingClass().getName()); builder.append('\''); - if (includeMessage && sourceRecord() != null) { + T original = original(); + if (includeMessage && original instanceof SourceRecord) { Review Comment: Ill expand on this, since it also provides context for the strange casting in DeadLetterQueueReporter. I think the strange "if source do <x>, if sink do <y>" in `ProcessingContext#toString(boolean)` and `DeadLetterQueueReporter#report()` are both symptoms of the inadequate ErrorReporter signature. Since there's only one ErrorReporter signature used by both sources and sinks, there isn't anything in the type definitions to force the DeadLetterQueueReporter to only work with sinks, so it has to include a runtime check in the implementation. Similarly, there are two different "kinds" of LogReporter, the one for sources that hits the first branch in the toString, and one that hits the second. Each LogReporter instance only ever takes one of the branches, but that isn't clear from the implementation. If we make ErrorReporter generic, we can have the implementations give functionality for the specific record types when compiling or instantiating, instead of in the report() function. But pulling on this thread unravelled a bit more: To make ErrorReporter generic, we have to make RetryWithToleranceOperator generic, make LogReporter generic, change some RetryWithToleranceOperator instance reuse, and move the RetryWithToleranceOperator out of WorkerTask. All of which are just lateral refactors, and I think would be easier to address in a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org