This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.5 by this push: new 3f7c0c83d6f KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (#14079) 3f7c0c83d6f is described below commit 3f7c0c83d6f761a49d12d182beec9c416ac96012 Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Tue Jul 25 14:03:29 2023 +0100 KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (#14079) Reviewers: Chris Egerton <chr...@aiven.io> --- .../connect/runtime/AbstractWorkerSourceTask.java | 8 ++- .../runtime/ExactlyOnceWorkerSourceTask.java | 8 ++- .../org/apache/kafka/connect/runtime/Worker.java | 11 ++-- .../kafka/connect/runtime/WorkerSinkTask.java | 12 +++- .../kafka/connect/runtime/WorkerSourceTask.java | 8 ++- .../runtime/AbstractWorkerSourceTaskTest.java | 38 +++++++++++-- .../connect/runtime/ErrorHandlingTaskTest.java | 58 +++++++++---------- .../runtime/ExactlyOnceWorkerSourceTaskTest.java | 2 +- .../kafka/connect/runtime/WorkerSinkTaskTest.java | 66 +++++++++++++++++++--- .../runtime/WorkerSinkTaskThreadedTest.java | 3 +- .../connect/runtime/WorkerSourceTaskTest.java | 2 +- 11 files changed, 155 insertions(+), 61 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 6d5446d9c21..4f9e0936ee0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -37,6 +37,7 @@ import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.ToleranceType; @@ -65,6 +66,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; @@ -195,6 +197,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { private final boolean topicTrackingEnabled; private final TopicCreation topicCreation; private final Executor closeExecutor; + private final Supplier<List<ErrorReporter>> errorReportersSupplier; // Visible for testing List<SourceRecord> toSend; @@ -224,7 +227,8 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, - Executor closeExecutor) { + Executor closeExecutor, + Supplier<List<ErrorReporter>> errorReportersSupplier) { super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, retryWithToleranceOperator, time, statusBackingStore); @@ -242,6 +246,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks"); this.closeExecutor = closeExecutor; this.sourceTaskContext = sourceTaskContext; + this.errorReportersSupplier = errorReportersSupplier; this.stopRequestedLatch = new CountDownLatch(1); this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); @@ -261,6 +266,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { @Override protected void initializeAndStart() { + retryWithToleranceOperator.reporters(errorReportersSupplier.get()); prepareToInitializeTask(); offsetStore.start(); // If we try to start the task at all by invoking initialize, then count this as diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index 30dafaac81d..da58b4fafc9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -47,11 +48,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; /** @@ -94,11 +97,12 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { SourceConnectorConfig sourceConfig, Executor closeExecutor, Runnable preProducerCheck, - Runnable postProducerCheck) { + Runnable postProducerCheck, + Supplier<List<ErrorReporter>> errorReportersSupplier) { super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, new WorkerSourceTaskContext(offsetReader, id, configState, buildTransactionContext(sourceConfig)), producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, - loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor); + loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier); this.transactionOpen = false; this.commitableRecords = new LinkedHashMap<>(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index b43d9a7dc58..ee68f2731b1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -1370,7 +1370,6 @@ public class Worker { TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformationStages(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); - retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, keyConverter, valueConverter, headerConverter); @@ -1381,7 +1380,8 @@ public class Worker { return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, valueConverter, errorHandlingMetrics, headerConverter, transformationChain, consumer, classLoader, time, - retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore(), + () -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); } } @@ -1410,7 +1410,6 @@ public class Worker { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig.originalsStrings(), config.topicCreationEnable()); - retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); @@ -1442,7 +1441,7 @@ public class Worker { return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, errorHandlingMetrics, headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups, offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, - retryWithToleranceOperator, herder.statusBackingStore(), executor); + retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); } } @@ -1478,7 +1477,6 @@ public class Worker { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig.originalsStrings(), config.topicCreationEnable()); - retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); @@ -1508,7 +1506,8 @@ public class Worker { return new ExactlyOnceWorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups, offsetReader, offsetWriter, offsetStore, config, configState, metrics, errorHandlingMetrics, classLoader, time, retryWithToleranceOperator, - herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck); + herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck, + () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 85454a60670..3314cc8db18 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -40,13 +40,14 @@ import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; -import org.apache.kafka.connect.storage.ClusterConfigState; -import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StatusBackingStore; @@ -61,6 +62,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -98,6 +100,7 @@ class WorkerSinkTask extends WorkerTask { private boolean committing; private boolean taskStopped; private final WorkerErrantRecordReporter workerErrantRecordReporter; + private final Supplier<List<ErrorReporter>> errorReportersSupplier; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, @@ -116,7 +119,8 @@ class WorkerSinkTask extends WorkerTask { Time time, RetryWithToleranceOperator retryWithToleranceOperator, WorkerErrantRecordReporter workerErrantRecordReporter, - StatusBackingStore statusBackingStore) { + StatusBackingStore statusBackingStore, + Supplier<List<ErrorReporter>> errorReportersSupplier) { super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, retryWithToleranceOperator, time, statusBackingStore); @@ -145,6 +149,7 @@ class WorkerSinkTask extends WorkerTask { this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); this.taskStopped = false; this.workerErrantRecordReporter = workerErrantRecordReporter; + this.errorReportersSupplier = errorReportersSupplier; } @Override @@ -299,6 +304,7 @@ class WorkerSinkTask extends WorkerTask { @Override protected void initializeAndStart() { SinkConnectorConfig.validate(taskConfig); + retryWithToleranceOperator.reporters(errorReportersSupplier.get()); if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) { List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index a6767675a3a..7b09f501460 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; @@ -40,6 +41,7 @@ import org.apache.kafka.connect.util.TopicCreationGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -48,6 +50,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets; @@ -83,12 +86,13 @@ class WorkerSourceTask extends AbstractWorkerSourceTask { Time time, RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, - Executor closeExecutor) { + Executor closeExecutor, + Supplier<List<ErrorReporter>> errorReportersSupplier) { super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, new WorkerSourceTaskContext(offsetReader, id, configState, null), producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, loader, - time, retryWithToleranceOperator, statusBackingStore, closeExecutor); + time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier); this.committableOffsets = CommittableOffsets.EMPTY; this.submittedRecords = new SubmittedRecords(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java index 9ac7d5cca28..26ee2fb4e0c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java @@ -37,6 +37,8 @@ import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.integration.MonitorableSourceConnector; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -72,6 +74,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; @@ -95,6 +98,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -348,7 +352,8 @@ public class AbstractWorkerSourceTaskTest { StringConverter stringConverter = new StringConverter(); SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders(); - createWorkerTask(stringConverter, testConverter, stringConverter); + createWorkerTask(stringConverter, testConverter, stringConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, + Collections::emptyList); expectSendRecord(null); expectTopicCreation(TOPIC); @@ -689,6 +694,28 @@ public class AbstractWorkerSourceTaskTest { verify(transformationChain, times(2)).apply(eq(record3)); } + @Test + public void testErrorReportersConfigured() { + RetryWithToleranceOperator retryWithToleranceOperator = mock(RetryWithToleranceOperator.class); + List<ErrorReporter> errorReporters = Collections.singletonList(mock(ErrorReporter.class)); + createWorkerTask(keyConverter, valueConverter, headerConverter, retryWithToleranceOperator, () -> errorReporters); + workerTask.initializeAndStart(); + + ArgumentCaptor<List<ErrorReporter>> errorReportersCapture = ArgumentCaptor.forClass(List.class); + verify(retryWithToleranceOperator).reporters(errorReportersCapture.capture()); + assertEquals(errorReporters, errorReportersCapture.getValue()); + } + + @Test + public void testErrorReporterConfigurationExceptionPropagation() { + createWorkerTask(keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, + () -> { + throw new ConnectException("Failed to create error reporters"); + } + ); + assertThrows(ConnectException.class, () -> workerTask.initializeAndStart()); + } + private void expectSendRecord(Headers headers) { if (headers != null) expectConvertHeadersAndKeyValue(headers, TOPIC); @@ -799,15 +826,16 @@ public class AbstractWorkerSourceTaskTest { } private void createWorkerTask() { - createWorkerTask(keyConverter, valueConverter, headerConverter); + createWorkerTask(keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, Collections::emptyList); } - private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { + private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, + RetryWithToleranceOperator retryWithToleranceOperator, Supplier<List<ErrorReporter>> errorReportersSupplier) { workerTask = new AbstractWorkerSourceTask( taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, transformationChain, sourceTaskContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, - config, metrics, errorHandlingMetrics, plugins.delegatingLoader(), Time.SYSTEM, RetryWithToleranceOperatorTest.NOOP_OPERATOR, - statusBackingStore, Runnable::run) { + config, metrics, errorHandlingMetrics, plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator, + statusBackingStore, Runnable::run, errorReportersSupplier) { @Override protected void prepareToInitializeTask() { } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 1946b01ee5d..3bd2cf64e84 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -33,7 +33,6 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.integration.MonitorableSourceConnector; import org.apache.kafka.connect.json.JsonConverter; -import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.runtime.errors.LogReporter; @@ -48,6 +47,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; @@ -75,12 +75,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Arrays; import java.util.Set; -import java.util.Collections; -import java.util.Collection; import java.util.concurrent.Executor; import static java.util.Collections.emptyMap; @@ -98,16 +99,15 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG; import static org.junit.Assert.assertEquals; - import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(Parameterized.class) @@ -234,9 +234,8 @@ public class ErrorHandlingTaskTest { ErrorReporter reporter = mock(ErrorReporter.class); RetryWithToleranceOperator retryWithToleranceOperator = operator(); - retryWithToleranceOperator.reporters(singletonList(reporter)); - createSinkTask(initialState, retryWithToleranceOperator); + createSinkTask(initialState, retryWithToleranceOperator, singletonList(reporter)); workerSinkTask.initialize(TASK_CONFIG); workerSinkTask.initializeAndStart(); workerSinkTask.close(); @@ -253,11 +252,11 @@ public class ErrorHandlingTaskTest { ErrorReporter reporter = mock(ErrorReporter.class); RetryWithToleranceOperator retryWithToleranceOperator = operator(); - retryWithToleranceOperator.reporters(singletonList(reporter)); - createSourceTask(initialState, retryWithToleranceOperator); + createSourceTask(initialState, retryWithToleranceOperator, singletonList(reporter)); workerSourceTask.initialize(TASK_CONFIG); + workerSourceTask.initializeAndStart(); workerSourceTask.close(); verifyCloseSource(); verify(reporter).close(); @@ -269,15 +268,15 @@ public class ErrorHandlingTaskTest { ErrorReporter reporterB = mock(ErrorReporter.class); RetryWithToleranceOperator retryWithToleranceOperator = operator(); - retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB)); - createSourceTask(initialState, retryWithToleranceOperator); + createSourceTask(initialState, retryWithToleranceOperator, Arrays.asList(reporterA, reporterB)); // Even though the reporters throw exceptions, they should both still be closed. doThrow(new RuntimeException()).when(reporterA).close(); doThrow(new RuntimeException()).when(reporterB).close(); workerSourceTask.initialize(TASK_CONFIG); + workerSourceTask.initializeAndStart(); workerSourceTask.close(); verify(reporterA).close(); @@ -293,9 +292,7 @@ public class ErrorHandlingTaskTest { LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); - retryWithToleranceOperator.reporters(singletonList(reporter)); - createSinkTask(initialState, retryWithToleranceOperator); - + createSinkTask(initialState, retryWithToleranceOperator, singletonList(reporter)); // valid json ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>( @@ -345,8 +342,7 @@ public class ErrorHandlingTaskTest { LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); - retryWithToleranceOperator.reporters(singletonList(reporter)); - createSourceTask(initialState, retryWithToleranceOperator); + createSourceTask(initialState, retryWithToleranceOperator, singletonList(reporter)); // valid json Schema valSchema = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build(); @@ -406,8 +402,7 @@ public class ErrorHandlingTaskTest { LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics); RetryWithToleranceOperator retryWithToleranceOperator = operator(); - retryWithToleranceOperator.reporters(singletonList(reporter)); - createSourceTask(initialState, retryWithToleranceOperator, badConverter()); + createSourceTask(initialState, retryWithToleranceOperator, singletonList(reporter), badConverter()); // valid json Schema valSchema = SchemaBuilder.struct().field("val", Schema.INT32_SCHEMA).build(); @@ -494,7 +489,8 @@ public class ErrorHandlingTaskTest { } } - private void createSinkTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { + private void createSinkTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, + List<ErrorReporter> errorReporters) { JsonConverter converter = new JsonConverter(); Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter."); oo.put("converter.type", "value"); @@ -509,17 +505,17 @@ public class ErrorHandlingTaskTest { ClusterConfigState.EMPTY, metrics, converter, converter, errorHandlingMetrics, headerConverter, sinkTransforms, consumer, pluginLoader, time, retryWithToleranceOperator, workerErrantRecordReporter, - statusBackingStore); + statusBackingStore, () -> errorReporters); } - private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator) { + private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, List<ErrorReporter> errorReporters) { JsonConverter converter = new JsonConverter(); Map<String, Object> oo = workerConfig.originalsWithPrefix("value.converter."); oo.put("converter.type", "value"); oo.put("schemas.enable", "false"); converter.configure(oo); - createSourceTask(initialState, retryWithToleranceOperator, converter); + createSourceTask(initialState, retryWithToleranceOperator, errorReporters, converter); } private Converter badConverter() { @@ -531,8 +527,10 @@ public class ErrorHandlingTaskTest { return converter; } - private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) { - TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new TransformationStage<>(new FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator); + private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, + List<ErrorReporter> errorReporters, Converter converter) { + TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList( + new TransformationStage<>(new FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator); workerSourceTask = spy(new WorkerSourceTask( taskId, sourceTask, statusListener, initialState, converter, @@ -542,7 +540,7 @@ public class ErrorHandlingTaskTest { offsetReader, offsetWriter, offsetStore, workerConfig, ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator, - statusBackingStore, (Executor) Runnable::run)); + statusBackingStore, (Executor) Runnable::run, () -> errorReporters)); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index 9939becc9c1..6aa5844dd2c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -291,7 +291,7 @@ public class ExactlyOnceWorkerSourceTaskTest { workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore, - sourceConfig, Runnable::run, preProducerCheck, postProducerCheck); + sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 044b4a4e3a2..6cc7a38d2a3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.connect.runtime; -import java.util.Arrays; -import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -39,15 +37,17 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; -import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup; -import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StatusBackingStore; @@ -70,10 +70,12 @@ import org.powermock.reflect.Whitebox; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -85,16 +87,19 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import static java.util.Arrays.asList; import static java.util.Collections.singleton; -import static org.junit.Assert.assertSame; +import static org.easymock.EasyMock.createMock; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -183,11 +188,16 @@ public class WorkerSinkTaskTest { } private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) { + createTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, Collections::emptyList); + } + + private void createTask(TargetState initialState, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter, + RetryWithToleranceOperator retryWithToleranceOperator, Supplier<List<ErrorReporter>> errorReportersSupplier) { workerTask = new WorkerSinkTask( - taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, - keyConverter, valueConverter, errorHandlingMetrics, headerConverter, - transformationChain, consumer, pluginLoader, time, - RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore); + taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, + keyConverter, valueConverter, errorHandlingMetrics, headerConverter, + transformationChain, consumer, pluginLoader, time, + retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier); } @After @@ -1918,6 +1928,44 @@ public class WorkerSinkTaskTest { PowerMock.verifyAll(); } + @Test + public void testErrorReportersConfigured() { + RetryWithToleranceOperator retryWithToleranceOperator = createMock(RetryWithToleranceOperator.class); + List<ErrorReporter> errorReporters = Collections.singletonList(createMock(ErrorReporter.class)); + createTask(initialState, keyConverter, valueConverter, headerConverter, retryWithToleranceOperator, + () -> errorReporters); + + expectInitializeTask(); + Capture<List<ErrorReporter>> errorReportersCapture = EasyMock.newCapture(); + retryWithToleranceOperator.reporters(EasyMock.capture(errorReportersCapture)); + PowerMock.expectLastCall(); + + PowerMock.replayAll(retryWithToleranceOperator); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + + assertEquals(errorReporters, errorReportersCapture.getValue()); + + PowerMock.verifyAll(); + } + + @Test + public void testErrorReporterConfigurationExceptionPropagation() { + createTask(initialState, keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, + () -> { + throw new ConnectException("Failed to create error reporters"); + } + ); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + assertThrows(ConnectException.class, () -> workerTask.initializeAndStart()); + + PowerMock.verifyAll(); + } + private void expectInitializeTask() { consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener)); PowerMock.expectLastCall(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 096ced35d0e..15d32d961c1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -145,7 +145,8 @@ public class WorkerSinkTaskThreadedTest { taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, errorHandlingMetrics, headerConverter, new TransformationChain<>(Collections.emptyList(), RetryWithToleranceOperatorTest.NOOP_OPERATOR), - consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore); + consumer, pluginLoader, time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, null, statusBackingStore, + Collections::emptyList); recordsReturned = 0; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 928f5d89430..c54c373645f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -235,7 +235,7 @@ public class WorkerSourceTaskTest { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, errorHandlingMetrics, headerConverter, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, - retryWithToleranceOperator, statusBackingStore, Runnable::run); + retryWithToleranceOperator, statusBackingStore, Runnable::run, Collections::emptyList); } @Test