This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.5 by this push:
     new 3f7c0c83d6f KAFKA-15238: Move DLQ reporter setup from the 
DistributedHerder's tick thread to the sink task thread (#14079)
3f7c0c83d6f is described below

commit 3f7c0c83d6f761a49d12d182beec9c416ac96012
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Tue Jul 25 14:03:29 2023 +0100

    KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick 
thread to the sink task thread (#14079)
    
    Reviewers: Chris Egerton <chr...@aiven.io>
---
 .../connect/runtime/AbstractWorkerSourceTask.java  |  8 ++-
 .../runtime/ExactlyOnceWorkerSourceTask.java       |  8 ++-
 .../org/apache/kafka/connect/runtime/Worker.java   | 11 ++--
 .../kafka/connect/runtime/WorkerSinkTask.java      | 12 +++-
 .../kafka/connect/runtime/WorkerSourceTask.java    |  8 ++-
 .../runtime/AbstractWorkerSourceTaskTest.java      | 38 +++++++++++--
 .../connect/runtime/ErrorHandlingTaskTest.java     | 58 +++++++++----------
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   |  2 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 66 +++++++++++++++++++---
 .../runtime/WorkerSinkTaskThreadedTest.java        |  3 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |  2 +-
 11 files changed, 155 insertions(+), 61 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 6d5446d9c21..4f9e0936ee0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -37,6 +37,7 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
@@ -65,6 +66,7 @@ import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
 
@@ -195,6 +197,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
     private final boolean topicTrackingEnabled;
     private final TopicCreation topicCreation;
     private final Executor closeExecutor;
+    private final Supplier<List<ErrorReporter>> errorReportersSupplier;
 
     // Visible for testing
     List<SourceRecord> toSend;
@@ -224,7 +227,8 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
                                        Time time,
                                        RetryWithToleranceOperator 
retryWithToleranceOperator,
                                        StatusBackingStore statusBackingStore,
-                                       Executor closeExecutor) {
+                                       Executor closeExecutor,
+                                       Supplier<List<ErrorReporter>> 
errorReportersSupplier) {
 
         super(id, statusListener, initialState, loader, connectMetrics, 
errorMetrics,
                 retryWithToleranceOperator, time, statusBackingStore);
@@ -242,6 +246,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
         this.offsetStore = Objects.requireNonNull(offsetStore, "offset store 
cannot be null for source tasks");
         this.closeExecutor = closeExecutor;
         this.sourceTaskContext = sourceTaskContext;
+        this.errorReportersSupplier = errorReportersSupplier;
 
         this.stopRequestedLatch = new CountDownLatch(1);
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, 
connectMetrics);
@@ -261,6 +266,7 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
 
     @Override
     protected void initializeAndStart() {
+        retryWithToleranceOperator.reporters(errorReportersSupplier.get());
         prepareToInitializeTask();
         offsetStore.start();
         // If we try to start the task at all by invoking initialize, then 
count this as
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 30dafaac81d..da58b4fafc9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -47,11 +48,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 
 /**
@@ -94,11 +97,12 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
                                        SourceConnectorConfig sourceConfig,
                                        Executor closeExecutor,
                                        Runnable preProducerCheck,
-                                       Runnable postProducerCheck) {
+                                       Runnable postProducerCheck,
+                                       Supplier<List<ErrorReporter>> 
errorReportersSupplier) {
         super(id, task, statusListener, initialState, keyConverter, 
valueConverter, headerConverter, transformationChain,
                 new WorkerSourceTaskContext(offsetReader, id, configState, 
buildTransactionContext(sourceConfig)),
                 producer, admin, topicGroups, offsetReader, offsetWriter, 
offsetStore, workerConfig, connectMetrics, errorMetrics,
-                loader, time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor);
+                loader, time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier);
 
         this.transactionOpen = false;
         this.commitableRecords = new LinkedHashMap<>();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index b43d9a7dc58..ee68f2731b1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -1370,7 +1370,6 @@ public class Worker {
             TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformationStages(), 
retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
-            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
             WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
                     keyConverter, valueConverter, headerConverter);
 
@@ -1381,7 +1380,8 @@ public class Worker {
 
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
                     valueConverter, errorHandlingMetrics, headerConverter, 
transformationChain, consumer, classLoader, time,
-                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore(),
+                    () -> sinkTaskReporters(id, sinkConfig, 
errorHandlingMetrics, connectorClass));
         }
     }
 
@@ -1410,7 +1410,6 @@ public class Worker {
 
             SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
                     connectorConfig.originalsStrings(), 
config.topicCreationEnable());
-            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
             TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), 
retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
 
@@ -1442,7 +1441,7 @@ public class Worker {
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter, errorHandlingMetrics,
                     headerConverter, transformationChain, producer, 
topicAdmin, topicCreationGroups,
                     offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
-                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
         }
     }
 
@@ -1478,7 +1477,6 @@ public class Worker {
 
             SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
                     connectorConfig.originalsStrings(), 
config.topicCreationEnable());
-            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
             TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), 
retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
 
@@ -1508,7 +1506,8 @@ public class Worker {
             return new ExactlyOnceWorkerSourceTask(id, (SourceTask) task, 
statusListener, initialState, keyConverter, valueConverter,
                     headerConverter, transformationChain, producer, 
topicAdmin, topicCreationGroups,
                     offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, errorHandlingMetrics, classLoader, time, 
retryWithToleranceOperator,
-                    herder.statusBackingStore(), sourceConfig, executor, 
preProducerCheck, postProducerCheck);
+                    herder.statusBackingStore(), sourceConfig, executor, 
preProducerCheck, postProducerCheck,
+                    () -> sourceTaskReporters(id, sourceConfig, 
errorHandlingMetrics));
         }
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 85454a60670..3314cc8db18 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -40,13 +40,14 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.storage.ClusterConfigState;
-import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -61,6 +62,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -98,6 +100,7 @@ class WorkerSinkTask extends WorkerTask {
     private boolean committing;
     private boolean taskStopped;
     private final WorkerErrantRecordReporter workerErrantRecordReporter;
+    private final Supplier<List<ErrorReporter>> errorReportersSupplier;
 
     public WorkerSinkTask(ConnectorTaskId id,
                           SinkTask task,
@@ -116,7 +119,8 @@ class WorkerSinkTask extends WorkerTask {
                           Time time,
                           RetryWithToleranceOperator 
retryWithToleranceOperator,
                           WorkerErrantRecordReporter 
workerErrantRecordReporter,
-                          StatusBackingStore statusBackingStore) {
+                          StatusBackingStore statusBackingStore,
+                          Supplier<List<ErrorReporter>> 
errorReportersSupplier) {
         super(id, statusListener, initialState, loader, connectMetrics, 
errorMetrics,
                 retryWithToleranceOperator, time, statusBackingStore);
 
@@ -145,6 +149,7 @@ class WorkerSinkTask extends WorkerTask {
         this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
         this.taskStopped = false;
         this.workerErrantRecordReporter = workerErrantRecordReporter;
+        this.errorReportersSupplier = errorReportersSupplier;
     }
 
     @Override
@@ -299,6 +304,7 @@ class WorkerSinkTask extends WorkerTask {
     @Override
     protected void initializeAndStart() {
         SinkConnectorConfig.validate(taskConfig);
+        retryWithToleranceOperator.reporters(errorReportersSupplier.get());
 
         if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
             List<String> topics = 
SinkConnectorConfig.parseTopicsList(taskConfig);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a6767675a3a..7b09f501460 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
@@ -40,6 +41,7 @@ import org.apache.kafka.connect.util.TopicCreationGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -48,6 +50,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static 
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
 
@@ -83,12 +86,13 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
                             Time time,
                             RetryWithToleranceOperator 
retryWithToleranceOperator,
                             StatusBackingStore statusBackingStore,
-                            Executor closeExecutor) {
+                            Executor closeExecutor,
+                            Supplier<List<ErrorReporter>> 
errorReportersSupplier) {
 
         super(id, task, statusListener, initialState, keyConverter, 
valueConverter, headerConverter, transformationChain,
                 new WorkerSourceTaskContext(offsetReader, id, configState, 
null), producer,
                 admin, topicGroups, offsetReader, offsetWriter, offsetStore, 
workerConfig, connectMetrics, errorMetrics, loader,
-                time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor);
+                time, retryWithToleranceOperator, statusBackingStore, 
closeExecutor, errorReportersSupplier);
 
         this.committableOffsets = CommittableOffsets.EMPTY;
         this.submittedRecords = new SubmittedRecords();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index 9ac7d5cca28..26ee2fb4e0c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -37,6 +37,8 @@ import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.integration.MonitorableSourceConnector;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -72,6 +74,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
@@ -95,6 +98,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -348,7 +352,8 @@ public class AbstractWorkerSourceTaskTest {
         StringConverter stringConverter = new StringConverter();
         SampleConverterWithHeaders testConverter = new 
SampleConverterWithHeaders();
 
-        createWorkerTask(stringConverter, testConverter, stringConverter);
+        createWorkerTask(stringConverter, testConverter, stringConverter, 
RetryWithToleranceOperatorTest.NOOP_OPERATOR,
+                Collections::emptyList);
 
         expectSendRecord(null);
         expectTopicCreation(TOPIC);
@@ -689,6 +694,28 @@ public class AbstractWorkerSourceTaskTest {
         verify(transformationChain, times(2)).apply(eq(record3));
     }
 
+    @Test
+    public void testErrorReportersConfigured() {
+        RetryWithToleranceOperator retryWithToleranceOperator = 
mock(RetryWithToleranceOperator.class);
+        List<ErrorReporter> errorReporters = 
Collections.singletonList(mock(ErrorReporter.class));
+        createWorkerTask(keyConverter, valueConverter, headerConverter, 
retryWithToleranceOperator, () -> errorReporters);
+        workerTask.initializeAndStart();
+
+        ArgumentCaptor<List<ErrorReporter>> errorReportersCapture = 
ArgumentCaptor.forClass(List.class);
+        
verify(retryWithToleranceOperator).reporters(errorReportersCapture.capture());
+        assertEquals(errorReporters, errorReportersCapture.getValue());
+    }
+
+    @Test
+    public void testErrorReporterConfigurationExceptionPropagation() {
+        createWorkerTask(keyConverter, valueConverter, headerConverter, 
RetryWithToleranceOperatorTest.NOOP_OPERATOR,
+                () -> {
+                    throw new ConnectException("Failed to create error 
reporters");
+                }
+        );
+        assertThrows(ConnectException.class, () -> 
workerTask.initializeAndStart());
+    }
+
     private void expectSendRecord(Headers headers) {
         if (headers != null)
             expectConvertHeadersAndKeyValue(headers, TOPIC);
@@ -799,15 +826,16 @@ public class AbstractWorkerSourceTaskTest {
     }
 
     private void createWorkerTask() {
-        createWorkerTask(keyConverter, valueConverter, headerConverter);
+        createWorkerTask(keyConverter, valueConverter, headerConverter, 
RetryWithToleranceOperatorTest.NOOP_OPERATOR, Collections::emptyList);
     }
 
-    private void createWorkerTask(Converter keyConverter, Converter 
valueConverter, HeaderConverter headerConverter) {
+    private void createWorkerTask(Converter keyConverter, Converter 
valueConverter, HeaderConverter headerConverter,
+                                  RetryWithToleranceOperator 
retryWithToleranceOperator, Supplier<List<ErrorReporter>> 
errorReportersSupplier) {
         workerTask = new AbstractWorkerSourceTask(
                 taskId, sourceTask, statusListener, TargetState.STARTED, 
keyConverter, valueConverter, headerConverter, transformationChain,
                 sourceTaskContext, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, 
offsetStore,
-                config, metrics, errorHandlingMetrics,  
plugins.delegatingLoader(), Time.SYSTEM, 
RetryWithToleranceOperatorTest.NOOP_OPERATOR,
-                statusBackingStore, Runnable::run) {
+                config, metrics, errorHandlingMetrics,  
plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator,
+                statusBackingStore, Runnable::run, errorReportersSupplier) {
             @Override
             protected void prepareToInitializeTask() {
             }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 1946b01ee5d..3bd2cf64e84 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.integration.MonitorableSourceConnector;
 import org.apache.kafka.connect.json.JsonConverter;
-import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.ErrorReporter;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
@@ -48,6 +47,7 @@ import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
@@ -75,12 +75,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Arrays;
 import java.util.Set;
-import java.util.Collections;
-import java.util.Collection;
 import java.util.concurrent.Executor;
 
 import static java.util.Collections.emptyMap;
@@ -98,16 +99,15 @@ import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
 import static org.junit.Assert.assertEquals;
-
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 @RunWith(Parameterized.class)
@@ -234,9 +234,8 @@ public class ErrorHandlingTaskTest {
         ErrorReporter reporter = mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.reporters(singletonList(reporter));
 
-        createSinkTask(initialState, retryWithToleranceOperator);
+        createSinkTask(initialState, retryWithToleranceOperator, 
singletonList(reporter));
         workerSinkTask.initialize(TASK_CONFIG);
         workerSinkTask.initializeAndStart();
         workerSinkTask.close();
@@ -253,11 +252,11 @@ public class ErrorHandlingTaskTest {
         ErrorReporter reporter = mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.reporters(singletonList(reporter));
 
-        createSourceTask(initialState, retryWithToleranceOperator);
+        createSourceTask(initialState, retryWithToleranceOperator, 
singletonList(reporter));
 
         workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.initializeAndStart();
         workerSourceTask.close();
         verifyCloseSource();
         verify(reporter).close();
@@ -269,15 +268,15 @@ public class ErrorHandlingTaskTest {
         ErrorReporter reporterB = mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.reporters(Arrays.asList(reporterA, 
reporterB));
 
-        createSourceTask(initialState, retryWithToleranceOperator);
+        createSourceTask(initialState, retryWithToleranceOperator, 
Arrays.asList(reporterA, reporterB));
 
         // Even though the reporters throw exceptions, they should both still 
be closed.
         doThrow(new RuntimeException()).when(reporterA).close();
         doThrow(new RuntimeException()).when(reporterB).close();
 
         workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.initializeAndStart();
         workerSourceTask.close();
 
         verify(reporterA).close();
@@ -293,9 +292,7 @@ public class ErrorHandlingTaskTest {
         LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.reporters(singletonList(reporter));
-        createSinkTask(initialState, retryWithToleranceOperator);
-
+        createSinkTask(initialState, retryWithToleranceOperator, 
singletonList(reporter));
 
         // valid json
         ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(
@@ -345,8 +342,7 @@ public class ErrorHandlingTaskTest {
         LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.reporters(singletonList(reporter));
-        createSourceTask(initialState, retryWithToleranceOperator);
+        createSourceTask(initialState, retryWithToleranceOperator, 
singletonList(reporter));
 
         // valid json
         Schema valSchema = SchemaBuilder.struct().field("val", 
Schema.INT32_SCHEMA).build();
@@ -406,8 +402,7 @@ public class ErrorHandlingTaskTest {
         LogReporter reporter = new LogReporter(taskId, 
connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.reporters(singletonList(reporter));
-        createSourceTask(initialState, retryWithToleranceOperator, 
badConverter());
+        createSourceTask(initialState, retryWithToleranceOperator, 
singletonList(reporter), badConverter());
 
         // valid json
         Schema valSchema = SchemaBuilder.struct().field("val", 
Schema.INT32_SCHEMA).build();
@@ -494,7 +489,8 @@ public class ErrorHandlingTaskTest {
         }
     }
 
-    private void createSinkTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator) {
+    private void createSinkTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator,
+                                List<ErrorReporter> errorReporters) {
         JsonConverter converter = new JsonConverter();
         Map<String, Object> oo = 
workerConfig.originalsWithPrefix("value.converter.");
         oo.put("converter.type", "value");
@@ -509,17 +505,17 @@ public class ErrorHandlingTaskTest {
             ClusterConfigState.EMPTY, metrics, converter, converter, 
errorHandlingMetrics,
             headerConverter, sinkTransforms, consumer, pluginLoader, time,
             retryWithToleranceOperator, workerErrantRecordReporter,
-                statusBackingStore);
+                statusBackingStore, () -> errorReporters);
     }
 
-    private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator) {
+    private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator, List<ErrorReporter> 
errorReporters) {
         JsonConverter converter = new JsonConverter();
         Map<String, Object> oo = 
workerConfig.originalsWithPrefix("value.converter.");
         oo.put("converter.type", "value");
         oo.put("schemas.enable", "false");
         converter.configure(oo);
 
-        createSourceTask(initialState, retryWithToleranceOperator, converter);
+        createSourceTask(initialState, retryWithToleranceOperator, 
errorReporters, converter);
     }
 
     private Converter badConverter() {
@@ -531,8 +527,10 @@ public class ErrorHandlingTaskTest {
         return converter;
     }
 
-    private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
-        TransformationChain<SourceRecord> sourceTransforms = new 
TransformationChain<>(singletonList(new TransformationStage<>(new 
FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator);
+    private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator,
+                                  List<ErrorReporter> errorReporters, 
Converter converter) {
+        TransformationChain<SourceRecord> sourceTransforms = new 
TransformationChain<>(singletonList(
+                new TransformationStage<>(new 
FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator);
 
         workerSourceTask = spy(new WorkerSourceTask(
             taskId, sourceTask, statusListener, initialState, converter,
@@ -542,7 +540,7 @@ public class ErrorHandlingTaskTest {
                 offsetReader, offsetWriter, offsetStore, workerConfig,
                 ClusterConfigState.EMPTY, metrics, pluginLoader, time,
                 retryWithToleranceOperator,
-                statusBackingStore, (Executor) Runnable::run));
+                statusBackingStore, (Executor) Runnable::run, () -> 
errorReporters));
 
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 9939becc9c1..6aa5844dd2c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -291,7 +291,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, 
statusListener, initialState, keyConverter, valueConverter, headerConverter,
                 transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, 
offsetStore,
                 config, clusterConfigState, metrics, errorHandlingMetrics, 
plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, 
statusBackingStore,
-                sourceConfig, Runnable::run, preProducerCheck, 
postProducerCheck);
+                sourceConfig, Runnable::run, preProducerCheck, 
postProducerCheck, Collections::emptyList);
     }
 
     @Test
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 044b4a4e3a2..6cc7a38d2a3 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Arrays;
-import java.util.Iterator;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -39,15 +37,17 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
-import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
+import org.apache.kafka.connect.runtime.errors.ErrorReporter;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -70,10 +70,12 @@ import org.powermock.reflect.Whitebox;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -85,16 +87,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
-import static org.junit.Assert.assertSame;
+import static org.easymock.EasyMock.createMock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -183,11 +188,16 @@ public class WorkerSinkTaskTest {
     }
 
     private void createTask(TargetState initialState, Converter keyConverter, 
Converter valueConverter, HeaderConverter headerConverter) {
+        createTask(initialState, keyConverter, valueConverter, 
headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, 
Collections::emptyList);
+    }
+
+    private void createTask(TargetState initialState, Converter keyConverter, 
Converter valueConverter, HeaderConverter headerConverter,
+                            RetryWithToleranceOperator 
retryWithToleranceOperator, Supplier<List<ErrorReporter>> 
errorReportersSupplier) {
         workerTask = new WorkerSinkTask(
-            taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics,
-            keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
-            transformationChain, consumer, pluginLoader, time,
-            RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, 
statusBackingStore);
+                taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics,
+                keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
+                transformationChain, consumer, pluginLoader, time,
+                retryWithToleranceOperator, null, statusBackingStore, 
errorReportersSupplier);
     }
 
     @After
@@ -1918,6 +1928,44 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testErrorReportersConfigured() {
+        RetryWithToleranceOperator retryWithToleranceOperator = 
createMock(RetryWithToleranceOperator.class);
+        List<ErrorReporter> errorReporters = 
Collections.singletonList(createMock(ErrorReporter.class));
+        createTask(initialState, keyConverter, valueConverter, 
headerConverter, retryWithToleranceOperator,
+                () -> errorReporters);
+
+        expectInitializeTask();
+        Capture<List<ErrorReporter>> errorReportersCapture = 
EasyMock.newCapture();
+        
retryWithToleranceOperator.reporters(EasyMock.capture(errorReportersCapture));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll(retryWithToleranceOperator);
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+
+        assertEquals(errorReporters, errorReportersCapture.getValue());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testErrorReporterConfigurationExceptionPropagation() {
+        createTask(initialState, keyConverter, valueConverter, 
headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR,
+                () -> {
+                    throw new ConnectException("Failed to create error 
reporters");
+                }
+        );
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        assertThrows(ConnectException.class, () -> 
workerTask.initializeAndStart());
+
+        PowerMock.verifyAll();
+    }
+
     private void expectInitializeTask() {
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 096ced35d0e..15d32d961c1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -145,7 +145,8 @@ public class WorkerSinkTaskThreadedTest {
                 taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics, keyConverter,
                 valueConverter, errorHandlingMetrics, headerConverter,
                 new TransformationChain<>(Collections.emptyList(), 
RetryWithToleranceOperatorTest.NOOP_OPERATOR),
-                consumer, pluginLoader, time, 
RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore);
+                consumer, pluginLoader, time, 
RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore,
+                Collections::emptyList);
 
         recordsReturned = 0;
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 928f5d89430..c54c373645f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -235,7 +235,7 @@ public class WorkerSourceTaskTest {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverter, valueConverter, errorHandlingMetrics, 
headerConverter,
                 transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig),
                 offsetReader, offsetWriter, offsetStore, config, 
clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
-                retryWithToleranceOperator, statusBackingStore, Runnable::run);
+                retryWithToleranceOperator, statusBackingStore, Runnable::run, 
Collections::emptyList);
     }
 
     @Test


Reply via email to