C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450843020


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -17,82 +17,36 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
-import 
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
 import org.apache.kafka.connect.source.SourceRecord;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
 /**
- * Contains all the metadata related to the currently evaluating operation. 
Only one instance of this class is meant
- * to exist per task in a JVM.
+ * Contains all the metadata related to the currently evaluating operation, 
and associated with a particular
+ * sink or source record from the consumer or task, respectively. This class 
is not thread safe, and so once an
+ * instance is passed to a new thread, it should no longer be accessed by the 
previous thread.

Review Comment:
   Hmmm--I'm convinced we don't have to block on this, but I think there might 
still be value to making the mutable fields of the `ProcessingContext` class 
volatile to help reduce the possibility of footguns in the future.
   
   If a class has a volatile `ProcessingContext` field, that guarantees that 
reads of that field will always see the latest value for that field. It doesn't 
provide the same guarantees for the fields of the `ProcessingContext` instance, 
though--so while we'll always see an up-to-date reference to `volatile 
ProcessingContext ctx`, we may still get an out-of-date value when invoking 
`ctx.stage()`.
   
   This becomes possible if the same `ProcessingContext` instance is passed 
back and forth between threads, even if there is only ever one thread reading 
from or writing to the instance.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -168,11 +102,13 @@ public String toString(boolean includeMessage) {
         builder.append("' with class '");
         builder.append(executingClass() == null ? "null" : 
executingClass().getName());
         builder.append('\'');
-        if (includeMessage && sourceRecord() != null) {
+        T original = original();
+        if (includeMessage && original instanceof SourceRecord) {

Review Comment:
   Thank you for the exhaustive analysis 🙏
   
   I wholeheartedly agree that this would be better suited for a follow-up, and 
if/when that happens, the comments you've left have created an excellent place 
to start from.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -303,48 +303,14 @@ public String toString() {
      * @param reporters the error reporters (should not be null).
      */
     public synchronized void reporters(List<ErrorReporter> reporters) {
-        this.context.reporters(reporters);
-    }
-
-    /**
-     * Set the source record being processed in the connect pipeline.
-     *
-     * @param preTransformRecord the source record
-     */
-    public synchronized void sourceRecord(SourceRecord preTransformRecord) {
-        this.context.sourceRecord(preTransformRecord);
-    }
-
-    /**
-     * Set the record consumed from Kafka in a sink connector.
-     *
-     * @param consumedMessage the record
-     */
-    public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> 
consumedMessage) {
-        this.context.consumerRecord(consumedMessage);
-    }
-
-    /**
-     * @return true, if the last operation encountered an error; false 
otherwise
-     */
-    public synchronized boolean failed() {
-        return this.context.failed();
-    }
-
-    /**
-     * Returns the error encountered when processing the current stage.
-     *
-     * @return the error encountered when processing the current stage
-     */
-    public synchronized Throwable error() {
-        return this.context.error();
+        this.reporters = Objects.requireNonNull(reporters, "reporters");

Review Comment:
   Hmmm... I don't see the same confusion over ownership with a 
`CachedSupplier` since it's basically a drop-in for a `final` field that can't 
be instantiated before/during the constructor. But if this isn't how everyone 
reacts to a pattern like that, I agree it's best to try to avoid that confusion.
   
   I think the `AutoCloseableSupplier` interface is acceptable, if a little 
clunky. We also don't need to support 
`AutoCloseableSupplier<List<AutoCloseable>>` if it's also viable to use a 
`List<AutoCloseableSupplier>` instead.
   
   Some other possible alternatives:
   
   - Add a `start` hook to the `ErrorReporter` interface, during which all 
potentially-blocking initialization calls take place. Then we can restore the 
original logic for `WorkerTask` instances, where `reporters` is a final field, 
and we invoke `start` on each of them on the task work thread (i.e., the one 
that processes data for the task).
   - Move the `Supplier` into the `RetryWithToleranceOperator` class, and make 
it a final field, so that we can at least encapsulate this logic into the one 
class that actually uses the `ErrorReporter` interface at the moment
   
   @yashmayya IIRC you introduced the `Supplier` pattern for the `reporters` 
field; would be interested in your thoughts if you have time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to