C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449180005


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
     private final ErrorHandlingMetrics errorHandlingMetrics;
     private final CountDownLatch stopRequestedLatch;
     private volatile boolean stopping;   // indicates whether the operator has 
been asked to stop retrying
-
-    protected final ProcessingContext context;
+    private List<ErrorReporter> reporters;
 
     public RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
                                       ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics) {
-        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new CountDownLatch(1));
     }
 
     RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
                                ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics,
-                               ProcessingContext context, CountDownLatch 
stopRequestedLatch) {
+                               CountDownLatch stopRequestedLatch) {
         this.errorRetryTimeout = errorRetryTimeout;
         this.errorMaxDelayInMillis = errorMaxDelayInMillis;
         this.errorToleranceType = toleranceType;
         this.time = time;
         this.errorHandlingMetrics = errorHandlingMetrics;
-        this.context = context;
         this.stopRequestedLatch = stopRequestedLatch;
         this.stopping = false;
+        this.reporters = Collections.emptyList();
     }
 
-    public synchronized Future<Void> executeFailed(Stage stage, Class<?> 
executingClass,
-                                      ConsumerRecord<byte[], byte[]> 
consumerRecord,
-                                      Throwable error) {
-
+    public Future<Void> executeFailed(ProcessingContext<?> context, Stage 
stage, Class<?> executingClass, Throwable error) {
         markAsFailed();
-        context.consumerRecord(consumerRecord);
         context.currentContext(stage, executingClass);
         context.error(error);
         errorHandlingMetrics.recordFailure();
-        Future<Void> errantRecordFuture = context.report();
+        Future<Void> errantRecordFuture = report(context);
         if (!withinToleranceLimits()) {
             errorHandlingMetrics.recordError();
             throw new ConnectException("Tolerance exceeded in error handler", 
error);
         }
         return errantRecordFuture;
     }
 
-    public synchronized Future<Void> executeFailed(Stage stage, Class<?> 
executingClass,
-                                                   SourceRecord sourceRecord,
-                                                   Throwable error) {
-
-        markAsFailed();
-        context.sourceRecord(sourceRecord);
-        context.currentContext(stage, executingClass);
-        context.error(error);
-        errorHandlingMetrics.recordFailure();
-        Future<Void> errantRecordFuture = context.report();
-        if (!withinToleranceLimits()) {
-            errorHandlingMetrics.recordError();
-            throw new ConnectException("Tolerance exceeded in Source Worker 
error handler", error);
+    /**
+     * Report errors. Should be called only if an error was encountered while 
executing the operation.
+     *
+     * @return a errant record future that potentially aggregates the producer 
futures

Review Comment:
   Nit (I know this is just moved as-is from the `ProcessingContext` class but 
we might as well fix it up while we're in the neighborhood):
   ```suggestion
        * @return an errant record future that potentially aggregates the 
producer futures
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -17,82 +17,36 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
-import 
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
 import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
 /**
- * Contains all the metadata related to the currently evaluating operation. 
Only one instance of this class is meant
- * to exist per task in a JVM.
+ * Contains all the metadata related to the currently evaluating operation, 
and associated with a particular
+ * sink or source record from the consumer or task, respectively. This class 
is not thread safe, and so once an
+ * instance is passed to a new thread, it should no longer be accessed by the 
previous thread.

Review Comment:
   Two small questions:
   
   1. Is it technically safe to pass this back and forth between two threads, 
as long as it's not accessed concurrently by them?
   2. I don't see any synchronization to ensure that reads of fields like 
`position`, `klass`, etc. are always up-to-date across different threads. 
Should they be marked `volatile`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -168,11 +102,13 @@ public String toString(boolean includeMessage) {
         builder.append("' with class '");
         builder.append(executingClass() == null ? "null" : 
executingClass().getName());
         builder.append('\'');
-        if (includeMessage && sourceRecord() != null) {
+        T original = original();
+        if (includeMessage && original instanceof SourceRecord) {

Review Comment:
   Not a blocker, just a thought: if we want to avoid these `instanceof` checks 
(and the potential issues that could come with branches like this if an 
unexpected type is used), we could tweak the `ProcessingContext` class:
   
   - Make the constructor private
   - Add an abstract `recordMessage()` method that can be used in 
`toString(boolean includeMessage)` when `includeMessage` is true to replace the 
logic inside these `instanceof` checks
   - Add static factory constructor methods: `public static 
ProcessingContext<ConsumerRecord<byte[], byte[]>> 
forConsumerRecord(ConsumerRecord<byte[], byte[]>)` that return private 
subclasses which implement the `toString(boolean includeMessage)` method in the 
appropriate fashion for their particular types (could also do public subclasses 
but that'd clutter up the API a bit and isn't necessary IMO)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -303,48 +303,14 @@ public String toString() {
      * @param reporters the error reporters (should not be null).
      */
     public synchronized void reporters(List<ErrorReporter> reporters) {
-        this.context.reporters(reporters);
-    }
-
-    /**
-     * Set the source record being processed in the connect pipeline.
-     *
-     * @param preTransformRecord the source record
-     */
-    public synchronized void sourceRecord(SourceRecord preTransformRecord) {
-        this.context.sourceRecord(preTransformRecord);
-    }
-
-    /**
-     * Set the record consumed from Kafka in a sink connector.
-     *
-     * @param consumedMessage the record
-     */
-    public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> 
consumedMessage) {
-        this.context.consumerRecord(consumedMessage);
-    }
-
-    /**
-     * @return true, if the last operation encountered an error; false 
otherwise
-     */
-    public synchronized boolean failed() {
-        return this.context.failed();
-    }
-
-    /**
-     * Returns the error encountered when processing the current stage.
-     *
-     * @return the error encountered when processing the current stage
-     */
-    public synchronized Throwable error() {
-        return this.context.error();
+        this.reporters = Objects.requireNonNull(reporters, "reporters");

Review Comment:
   Not a blocker, and probably best left for follow-up: we only ever invoke 
this method once across the lifetime of this class. We might consider tweaking 
this class to:
   
   - Remove this method
   - Change the constructor to accept a `Supplier<List<ErrorReporter>> 
reporters`
   - Wrapping that in a `CachedSupplier` (sketched out below)
   
   ```java
   public class CachedSupplier<T> implements Supplier<T> {
   
       private final Supplier<T> supplier;
       private volatile T cached;
   
       public CachedSupplier(Supplier<T> supplier) {
           this.supplier = supplier;
           this.cached = null;
       }
   
       @Override
       public T get() {
           if (cached != null) {
               return cached;
           } else {
               // Only lock if we may end up instantiating our cached field
               synchronized (this) {
                   if (cached == null)
                       cached = supplier.get();
                   return cached;
               }
           }
       }
   }
   ```
   
   That'd eliminate the need for a lot of the synchronization around the 
`reporters` field and could simplify the API for the 
`RetryWithToleranceOperator` class. If this seems appealing, let me know and I 
can throw something together.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -17,30 +17,18 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
-import 
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
 import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
 /**
- * Contains all the metadata related to the currently evaluating operation. 
Only one instance of this class is meant
- * to exist per task in a JVM.
+ * Contains all the metadata related to the currently evaluating operation, 
and associated with a particular
+ * sink or source record from the consumer or task, respectively. This class 
is not thread safe, and so once an
+ * instance is passed to a new thread, it should no longer be accessed by the 
previous thread.
  */
-class ProcessingContext implements AutoCloseable {
-
-    private Collection<ErrorReporter> reporters = Collections.emptyList();
+public class ProcessingContext {
 
-    private ConsumerRecord<byte[], byte[]> consumedMessage;
-    private SourceRecord sourceRecord;
+    private final ConsumerRecord<byte[], byte[]> consumedMessage;
+    private final SourceRecord sourceRecord;
 
     /**
      * The following fields need to be reset every time a new record is seen.

Review Comment:
   Is this comment still relevant?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java:
##########
@@ -121,20 +121,22 @@ public static DeadLetterQueueReporter 
createAndSetup(Map<String, Object> adminPr
     /**
      * Write the raw records into a Kafka topic and return the producer future.
      *
-     * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+     * @param context processing context containing the raw record at {@link 
ProcessingContext#original()}.
      * @return the future associated with the writing of this record; never 
null
      */
-    public Future<RecordMetadata> report(ProcessingContext context) {
+    @SuppressWarnings("unchecked")
+    public Future<RecordMetadata> report(ProcessingContext<?> context) {
         if (dlqTopicName.isEmpty()) {
             return CompletableFuture.completedFuture(null);
         }
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
-        ConsumerRecord<byte[], byte[]> originalMessage = 
context.consumerRecord();
-        if (originalMessage == null) {
+        if (!(context.original() instanceof ConsumerRecord)) {

Review Comment:
   I wish we could retain full type safety here but if it's too much work to be 
worth it then we can live with this. Do you think we can at least log a warning 
inside this branch if `context.original()` is non-null, though?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -236,7 +237,7 @@ protected <V> V execAndHandleError(Operation<V> operation, 
Class<? extends Excep
     }
 
     // Visible for testing
-    void markAsFailed() {
+    synchronized void markAsFailed() {
         errorHandlingMetrics.recordErrorTimestamp();
         totalFailures++;
     }

Review Comment:
   (Commenting here because GitHub won't allow me to comment any lower)
   
   Is it possible that `withinToleranceLimits` is still not quite safe enough? 
Imagine this sequence of events:
   
   1. A `SinkTask` invokes `WorkerErrantRecordReporter::report` from a separate 
thread (i.e., not the one on which `SinkTask::put` is invoked), and as a 
result, this [invokes 
RetryWithToleranceOperator::executeFailed](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L113),
 which in turn [records a 
failure](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L107)
 such that `RetryWithToleranceOperator::withinToleranceLimits` now returns 
false.
   2. After this, on the main work thread for the sink task, 
`RetryWithToleranceOperator::execute` is invoked for [key/value/header 
conversion](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L533C39-L539)
 or [record 
transformation](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java#L55),
 and a tolerable exception is thrown while attempting the operation. A [check 
on 
withinToleranceLimits()](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L230-L232)
 is made, which return false, and causes an exception to be thrown that claims 
that the tolerable exception failed the task, when in reality, it was the 
exception reporte
 d via `WorkerErrantRecordReporter::report` that caused it to fail instead.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -143,7 +144,7 @@ public synchronized Future<Void> executeFailed(Stage stage, 
Class<?> executingCl
      * @param <V> return type of the result of the operation.
      * @return result of the operation
      */
-    public synchronized <V> V execute(Operation<V> operation, Stage stage, 
Class<?> executingClass) {
+    public <V> V execute(ProcessingContext<?> context, Operation<V> operation, 
Stage stage, Class<?> executingClass) {

Review Comment:
   I know this isn't your fault but if you have time, could we add a `throws` 
clause to the Javadocs stating that an exception will be thrown if a 
non-retriable error is encountered? I always get tripped up reading 
interactions with this class and a big part of it is trying to understand the 
conditions where exceptions are thrown, the context is marked as failed, or 
`null` is returned.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java:
##########
@@ -121,20 +121,22 @@ public static DeadLetterQueueReporter 
createAndSetup(Map<String, Object> adminPr
     /**
      * Write the raw records into a Kafka topic and return the producer future.
      *
-     * @param context processing context containing the raw record at {@link 
ProcessingContext#consumerRecord()}.
+     * @param context processing context containing the raw record at {@link 
ProcessingContext#original()}.
      * @return the future associated with the writing of this record; never 
null
      */
-    public Future<RecordMetadata> report(ProcessingContext context) {
+    @SuppressWarnings("unchecked")
+    public Future<RecordMetadata> report(ProcessingContext<?> context) {
         if (dlqTopicName.isEmpty()) {
             return CompletableFuture.completedFuture(null);
         }
         errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
 
-        ConsumerRecord<byte[], byte[]> originalMessage = 
context.consumerRecord();
-        if (originalMessage == null) {
+        if (!(context.original() instanceof ConsumerRecord)) {
             errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
             return CompletableFuture.completedFuture(null);
         }
+        ProcessingContext<ConsumerRecord<byte[], byte[]>> sinkContext = 
(ProcessingContext<ConsumerRecord<byte[], byte[]>>) context;

Review Comment:
   Just to be safe, should this be `ProcessingContext<? extends 
ConsumerRecord<byte[], byte[]>>`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements 
AutoCloseable {
     private final ErrorHandlingMetrics errorHandlingMetrics;
     private final CountDownLatch stopRequestedLatch;
     private volatile boolean stopping;   // indicates whether the operator has 
been asked to stop retrying
-
-    protected final ProcessingContext context;
+    private List<ErrorReporter> reporters;
 
     public RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
                                       ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics) {
-        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, 
errorHandlingMetrics, new CountDownLatch(1));
     }
 
     RetryWithToleranceOperator(long errorRetryTimeout, long 
errorMaxDelayInMillis,
                                ToleranceType toleranceType, Time time, 
ErrorHandlingMetrics errorHandlingMetrics,
-                               ProcessingContext context, CountDownLatch 
stopRequestedLatch) {
+                               CountDownLatch stopRequestedLatch) {
         this.errorRetryTimeout = errorRetryTimeout;
         this.errorMaxDelayInMillis = errorMaxDelayInMillis;
         this.errorToleranceType = toleranceType;
         this.time = time;
         this.errorHandlingMetrics = errorHandlingMetrics;
-        this.context = context;
         this.stopRequestedLatch = stopRequestedLatch;
         this.stopping = false;
+        this.reporters = Collections.emptyList();
     }
 
-    public synchronized Future<Void> executeFailed(Stage stage, Class<?> 
executingClass,
-                                      ConsumerRecord<byte[], byte[]> 
consumerRecord,
-                                      Throwable error) {
-
+    public Future<Void> executeFailed(ProcessingContext<?> context, Stage 
stage, Class<?> executingClass, Throwable error) {
         markAsFailed();
-        context.consumerRecord(consumerRecord);
         context.currentContext(stage, executingClass);
         context.error(error);
         errorHandlingMetrics.recordFailure();
-        Future<Void> errantRecordFuture = context.report();
+        Future<Void> errantRecordFuture = report(context);
         if (!withinToleranceLimits()) {
             errorHandlingMetrics.recordError();
             throw new ConnectException("Tolerance exceeded in error handler", 
error);
         }
         return errantRecordFuture;
     }
 
-    public synchronized Future<Void> executeFailed(Stage stage, Class<?> 
executingClass,
-                                                   SourceRecord sourceRecord,
-                                                   Throwable error) {

Review Comment:
   Just to make sure I'm following along: this was removed because it was made 
redundant by making `ProcessingContext` generic, but all previous call sites 
for either variant of `executeFailed` now call the single remaining variant, 
correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to