gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449513344


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -168,11 +102,13 @@ public String toString(boolean includeMessage) {
         builder.append("' with class '");
         builder.append(executingClass() == null ? "null" : 
executingClass().getName());
         builder.append('\'');
-        if (includeMessage && sourceRecord() != null) {
+        T original = original();
+        if (includeMessage && original instanceof SourceRecord) {

Review Comment:
   Ill expand on this, since it also provides context for the strange casting 
in DeadLetterQueueReporter.
   
   I think the strange "if source do <x>, if sink do <y>" in 
`ProcessingContext#toString(boolean)` and `DeadLetterQueueReporter#report()` 
are both symptoms of the inadequate ErrorReporter signature. Since there's only 
one ErrorReporter signature used by both sources and sinks, there isn't 
anything in the type definitions to force the DeadLetterQueueReporter to only 
work with sinks, so it has to include a runtime check in the implementation.
   
   Similarly, there are two different "kinds" of LogReporter, the one for 
sources that hits the first branch in the toString, and one that hits the 
second. Each LogReporter instance only ever takes one of the branches, but that 
isn't clear from the implementation. If we make ErrorReporter generic, we can 
have the implementations give functionality for the specific record types when 
compiling or instantiating, instead of in the report() function.
   
   But pulling on this thread unravelled a bit more: To make ErrorReporter 
generic, we have to make RetryWithToleranceOperator generic, make LogReporter 
generic, change some RetryWithToleranceOperator instance reuse, and move the 
RetryWithToleranceOperator out of WorkerTask. All of which are just lateral 
refactors, and I think would be easier to address in a follow-up.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to