C0urante commented on code in PR #15154: URL: https://github.com/apache/kafka/pull/15154#discussion_r1449180005
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements AutoCloseable { private final ErrorHandlingMetrics errorHandlingMetrics; private final CountDownLatch stopRequestedLatch; private volatile boolean stopping; // indicates whether the operator has been asked to stop retrying - - protected final ProcessingContext context; + private List<ErrorReporter> reporters; public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) { - this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1)); + this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new CountDownLatch(1)); } RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics, - ProcessingContext context, CountDownLatch stopRequestedLatch) { + CountDownLatch stopRequestedLatch) { this.errorRetryTimeout = errorRetryTimeout; this.errorMaxDelayInMillis = errorMaxDelayInMillis; this.errorToleranceType = toleranceType; this.time = time; this.errorHandlingMetrics = errorHandlingMetrics; - this.context = context; this.stopRequestedLatch = stopRequestedLatch; this.stopping = false; + this.reporters = Collections.emptyList(); } - public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass, - ConsumerRecord<byte[], byte[]> consumerRecord, - Throwable error) { - + public Future<Void> executeFailed(ProcessingContext<?> context, Stage stage, Class<?> executingClass, Throwable error) { markAsFailed(); - context.consumerRecord(consumerRecord); context.currentContext(stage, executingClass); context.error(error); errorHandlingMetrics.recordFailure(); - Future<Void> errantRecordFuture = context.report(); + Future<Void> errantRecordFuture = report(context); if (!withinToleranceLimits()) { errorHandlingMetrics.recordError(); throw new ConnectException("Tolerance exceeded in error handler", error); } return errantRecordFuture; } - public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass, - SourceRecord sourceRecord, - Throwable error) { - - markAsFailed(); - context.sourceRecord(sourceRecord); - context.currentContext(stage, executingClass); - context.error(error); - errorHandlingMetrics.recordFailure(); - Future<Void> errantRecordFuture = context.report(); - if (!withinToleranceLimits()) { - errorHandlingMetrics.recordError(); - throw new ConnectException("Tolerance exceeded in Source Worker error handler", error); + /** + * Report errors. Should be called only if an error was encountered while executing the operation. + * + * @return a errant record future that potentially aggregates the producer futures Review Comment: Nit (I know this is just moved as-is from the `ProcessingContext` class but we might as well fix it up while we're in the neighborhood): ```suggestion * @return an errant record future that potentially aggregates the producer futures ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java: ########## @@ -17,82 +17,36 @@ package org.apache.kafka.connect.runtime.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture; import org.apache.kafka.connect.source.SourceRecord; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; - /** - * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant - * to exist per task in a JVM. + * Contains all the metadata related to the currently evaluating operation, and associated with a particular + * sink or source record from the consumer or task, respectively. This class is not thread safe, and so once an + * instance is passed to a new thread, it should no longer be accessed by the previous thread. Review Comment: Two small questions: 1. Is it technically safe to pass this back and forth between two threads, as long as it's not accessed concurrently by them? 2. I don't see any synchronization to ensure that reads of fields like `position`, `klass`, etc. are always up-to-date across different threads. Should they be marked `volatile`? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java: ########## @@ -168,11 +102,13 @@ public String toString(boolean includeMessage) { builder.append("' with class '"); builder.append(executingClass() == null ? "null" : executingClass().getName()); builder.append('\''); - if (includeMessage && sourceRecord() != null) { + T original = original(); + if (includeMessage && original instanceof SourceRecord) { Review Comment: Not a blocker, just a thought: if we want to avoid these `instanceof` checks (and the potential issues that could come with branches like this if an unexpected type is used), we could tweak the `ProcessingContext` class: - Make the constructor private - Add an abstract `recordMessage()` method that can be used in `toString(boolean includeMessage)` when `includeMessage` is true to replace the logic inside these `instanceof` checks - Add static factory constructor methods: `public static ProcessingContext<ConsumerRecord<byte[], byte[]>> forConsumerRecord(ConsumerRecord<byte[], byte[]>)` that return private subclasses which implement the `toString(boolean includeMessage)` method in the appropriate fashion for their particular types (could also do public subclasses but that'd clutter up the API a bit and isn't necessary IMO) ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -303,48 +303,14 @@ public String toString() { * @param reporters the error reporters (should not be null). */ public synchronized void reporters(List<ErrorReporter> reporters) { - this.context.reporters(reporters); - } - - /** - * Set the source record being processed in the connect pipeline. - * - * @param preTransformRecord the source record - */ - public synchronized void sourceRecord(SourceRecord preTransformRecord) { - this.context.sourceRecord(preTransformRecord); - } - - /** - * Set the record consumed from Kafka in a sink connector. - * - * @param consumedMessage the record - */ - public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) { - this.context.consumerRecord(consumedMessage); - } - - /** - * @return true, if the last operation encountered an error; false otherwise - */ - public synchronized boolean failed() { - return this.context.failed(); - } - - /** - * Returns the error encountered when processing the current stage. - * - * @return the error encountered when processing the current stage - */ - public synchronized Throwable error() { - return this.context.error(); + this.reporters = Objects.requireNonNull(reporters, "reporters"); Review Comment: Not a blocker, and probably best left for follow-up: we only ever invoke this method once across the lifetime of this class. We might consider tweaking this class to: - Remove this method - Change the constructor to accept a `Supplier<List<ErrorReporter>> reporters` - Wrapping that in a `CachedSupplier` (sketched out below) ```java public class CachedSupplier<T> implements Supplier<T> { private final Supplier<T> supplier; private volatile T cached; public CachedSupplier(Supplier<T> supplier) { this.supplier = supplier; this.cached = null; } @Override public T get() { if (cached != null) { return cached; } else { // Only lock if we may end up instantiating our cached field synchronized (this) { if (cached == null) cached = supplier.get(); return cached; } } } } ``` That'd eliminate the need for a lot of the synchronization around the `reporters` field and could simplify the API for the `RetryWithToleranceOperator` class. If this seems appealing, let me know and I can throw something together. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java: ########## @@ -17,30 +17,18 @@ package org.apache.kafka.connect.runtime.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture; import org.apache.kafka.connect.source.SourceRecord; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.stream.Collectors; - /** - * Contains all the metadata related to the currently evaluating operation. Only one instance of this class is meant - * to exist per task in a JVM. + * Contains all the metadata related to the currently evaluating operation, and associated with a particular + * sink or source record from the consumer or task, respectively. This class is not thread safe, and so once an + * instance is passed to a new thread, it should no longer be accessed by the previous thread. */ -class ProcessingContext implements AutoCloseable { - - private Collection<ErrorReporter> reporters = Collections.emptyList(); +public class ProcessingContext { - private ConsumerRecord<byte[], byte[]> consumedMessage; - private SourceRecord sourceRecord; + private final ConsumerRecord<byte[], byte[]> consumedMessage; + private final SourceRecord sourceRecord; /** * The following fields need to be reset every time a new record is seen. Review Comment: Is this comment still relevant? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java: ########## @@ -121,20 +121,22 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr /** * Write the raw records into a Kafka topic and return the producer future. * - * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. + * @param context processing context containing the raw record at {@link ProcessingContext#original()}. * @return the future associated with the writing of this record; never null */ - public Future<RecordMetadata> report(ProcessingContext context) { + @SuppressWarnings("unchecked") + public Future<RecordMetadata> report(ProcessingContext<?> context) { if (dlqTopicName.isEmpty()) { return CompletableFuture.completedFuture(null); } errorHandlingMetrics.recordDeadLetterQueueProduceRequest(); - ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord(); - if (originalMessage == null) { + if (!(context.original() instanceof ConsumerRecord)) { Review Comment: I wish we could retain full type safety here but if it's too much work to be worth it then we can live with this. Do you think we can at least log a warning inside this branch if `context.original()` is non-null, though? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -236,7 +237,7 @@ protected <V> V execAndHandleError(Operation<V> operation, Class<? extends Excep } // Visible for testing - void markAsFailed() { + synchronized void markAsFailed() { errorHandlingMetrics.recordErrorTimestamp(); totalFailures++; } Review Comment: (Commenting here because GitHub won't allow me to comment any lower) Is it possible that `withinToleranceLimits` is still not quite safe enough? Imagine this sequence of events: 1. A `SinkTask` invokes `WorkerErrantRecordReporter::report` from a separate thread (i.e., not the one on which `SinkTask::put` is invoked), and as a result, this [invokes RetryWithToleranceOperator::executeFailed](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L113), which in turn [records a failure](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L107) such that `RetryWithToleranceOperator::withinToleranceLimits` now returns false. 2. After this, on the main work thread for the sink task, `RetryWithToleranceOperator::execute` is invoked for [key/value/header conversion](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L533C39-L539) or [record transformation](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java#L55), and a tolerable exception is thrown while attempting the operation. A [check on withinToleranceLimits()](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L230-L232) is made, which return false, and causes an exception to be thrown that claims that the tolerable exception failed the task, when in reality, it was the exception reporte d via `WorkerErrantRecordReporter::report` that caused it to fail instead. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -143,7 +144,7 @@ public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingCl * @param <V> return type of the result of the operation. * @return result of the operation */ - public synchronized <V> V execute(Operation<V> operation, Stage stage, Class<?> executingClass) { + public <V> V execute(ProcessingContext<?> context, Operation<V> operation, Stage stage, Class<?> executingClass) { Review Comment: I know this isn't your fault but if you have time, could we add a `throws` clause to the Javadocs stating that an exception will be thrown if a non-retriable error is encountered? I always get tripped up reading interactions with this class and a big part of it is trying to understand the conditions where exceptions are thrown, the context is marked as failed, or `null` is returned. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java: ########## @@ -121,20 +121,22 @@ public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminPr /** * Write the raw records into a Kafka topic and return the producer future. * - * @param context processing context containing the raw record at {@link ProcessingContext#consumerRecord()}. + * @param context processing context containing the raw record at {@link ProcessingContext#original()}. * @return the future associated with the writing of this record; never null */ - public Future<RecordMetadata> report(ProcessingContext context) { + @SuppressWarnings("unchecked") + public Future<RecordMetadata> report(ProcessingContext<?> context) { if (dlqTopicName.isEmpty()) { return CompletableFuture.completedFuture(null); } errorHandlingMetrics.recordDeadLetterQueueProduceRequest(); - ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord(); - if (originalMessage == null) { + if (!(context.original() instanceof ConsumerRecord)) { errorHandlingMetrics.recordDeadLetterQueueProduceFailed(); return CompletableFuture.completedFuture(null); } + ProcessingContext<ConsumerRecord<byte[], byte[]>> sinkContext = (ProcessingContext<ConsumerRecord<byte[], byte[]>>) context; Review Comment: Just to be safe, should this be `ProcessingContext<? extends ConsumerRecord<byte[], byte[]>>`? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements AutoCloseable { private final ErrorHandlingMetrics errorHandlingMetrics; private final CountDownLatch stopRequestedLatch; private volatile boolean stopping; // indicates whether the operator has been asked to stop retrying - - protected final ProcessingContext context; + private List<ErrorReporter> reporters; public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) { - this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1)); + this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new CountDownLatch(1)); } RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics, - ProcessingContext context, CountDownLatch stopRequestedLatch) { + CountDownLatch stopRequestedLatch) { this.errorRetryTimeout = errorRetryTimeout; this.errorMaxDelayInMillis = errorMaxDelayInMillis; this.errorToleranceType = toleranceType; this.time = time; this.errorHandlingMetrics = errorHandlingMetrics; - this.context = context; this.stopRequestedLatch = stopRequestedLatch; this.stopping = false; + this.reporters = Collections.emptyList(); } - public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass, - ConsumerRecord<byte[], byte[]> consumerRecord, - Throwable error) { - + public Future<Void> executeFailed(ProcessingContext<?> context, Stage stage, Class<?> executingClass, Throwable error) { markAsFailed(); - context.consumerRecord(consumerRecord); context.currentContext(stage, executingClass); context.error(error); errorHandlingMetrics.recordFailure(); - Future<Void> errantRecordFuture = context.report(); + Future<Void> errantRecordFuture = report(context); if (!withinToleranceLimits()) { errorHandlingMetrics.recordError(); throw new ConnectException("Tolerance exceeded in error handler", error); } return errantRecordFuture; } - public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass, - SourceRecord sourceRecord, - Throwable error) { Review Comment: Just to make sure I'm following along: this was removed because it was made redundant by making `ProcessingContext` generic, but all previous call sites for either variant of `executeFailed` now call the single remaining variant, correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org