C0urante commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1450812678
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements AutoCloseable { private final ErrorHandlingMetrics errorHandlingMetrics; private final CountDownLatch stopRequestedLatch; private volatile boolean stopping; // indicates whether the operator has been asked to stop retrying - - protected final ProcessingContext context; + private List<ErrorReporter> reporters; public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) { - this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1)); + this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new CountDownLatch(1)); } RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics, - ProcessingContext context, CountDownLatch stopRequestedLatch) { + CountDownLatch stopRequestedLatch) { this.errorRetryTimeout = errorRetryTimeout; this.errorMaxDelayInMillis = errorMaxDelayInMillis; this.errorToleranceType = toleranceType; this.time = time; this.errorHandlingMetrics = errorHandlingMetrics; - this.context = context; this.stopRequestedLatch = stopRequestedLatch; this.stopping = false; + this.reporters = Collections.emptyList(); } - public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass, - ConsumerRecord<byte[], byte[]> consumerRecord, - Throwable error) { - + public Future<Void> executeFailed(ProcessingContext<?> context, Stage stage, Class<?> executingClass, Throwable error) { markAsFailed(); - context.consumerRecord(consumerRecord); context.currentContext(stage, executingClass); context.error(error); errorHandlingMetrics.recordFailure(); - Future<Void> errantRecordFuture = context.report(); + Future<Void> errantRecordFuture = report(context); if (!withinToleranceLimits()) { errorHandlingMetrics.recordError(); throw new ConnectException("Tolerance exceeded in error handler", error); } return errantRecordFuture; } - public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass, - SourceRecord sourceRecord, - Throwable error) { Review Comment: Excellent, thanks 🙏 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org