gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449326220


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
     private final ErrorHandlingMetrics errorHandlingMetrics;
     private final CountDownLatch stopRequestedLatch;
     private volatile boolean stopping;   // indicates whether the operator has 
been asked to stop retrying
-
-    protected final ProcessingContext context;
+    private List<ErrorReporter> reporters;
 
     public RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
                                       ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics) {
-        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new CountDownLatch(1));
     }
 
     RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
                                ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics,
-                               ProcessingContext context, CountDownLatch 
stopRequestedLatch) {
+                               CountDownLatch stopRequestedLatch) {
         this.errorRetryTimeout = errorRetryTimeout;
         this.errorMaxDelayInMillis = errorMaxDelayInMillis;
         this.errorToleranceType = toleranceType;
         this.time = time;
         this.errorHandlingMetrics = errorHandlingMetrics;
-        this.context = context;
         this.stopRequestedLatch = stopRequestedLatch;
         this.stopping = false;
+        this.reporters = Collections.emptyList();
     }
 
-    public synchronized Future<Void> executeFailed(Stage stage, Class<?> 
executingClass,
-                                      ConsumerRecord<byte[], byte[]> 
consumerRecord,
-                                      Throwable error) {
-
+    public Future<Void> executeFailed(ProcessingContext<?> context, Stage 
stage, Class<?> executingClass, Throwable error) {
         markAsFailed();
-        context.consumerRecord(consumerRecord);
         context.currentContext(stage, executingClass);
         context.error(error);
         errorHandlingMetrics.recordFailure();
-        Future<Void> errantRecordFuture = context.report();
+        Future<Void> errantRecordFuture = report(context);
         if (!withinToleranceLimits()) {
             errorHandlingMetrics.recordError();
             throw new ConnectException("Tolerance exceeded in error handler", 
error);
         }
         return errantRecordFuture;
     }
 
-    public synchronized Future<Void> executeFailed(Stage stage, Class<?> 
executingClass,
-                                                   SourceRecord sourceRecord,
-                                                   Throwable error) {

Review Comment:
   Yep, there were two nearly-identical implementations that differed only by 
the type of record they accepted.
   
   They differed in the ConnectException message, and when merging them I just 
kept the more generic message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to