This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2dcf306ef83 KAFKA-14132: Replace EasyMock and PowerMock with Mockito
in connect/runtime/ErrorHandlingTaskTest (#12735)
2dcf306ef83 is described below
commit 2dcf306ef839ada65ba021e0a9c0a046b754adf3
Author: Shekhar Rajak <[email protected]>
AuthorDate: Wed Dec 21 22:11:03 2022 +0530
KAFKA-14132: Replace EasyMock and PowerMock with Mockito in
connect/runtime/ErrorHandlingTaskTest (#12735)
Reviewers: Divij Vaidya <[email protected]>, Chris Egerton
<[email protected]>
---
build.gradle | 2 +-
.../connect/runtime/ErrorHandlingTaskTest.java | 313 ++++++++-------------
2 files changed, 125 insertions(+), 190 deletions(-)
diff --git a/build.gradle b/build.gradle
index 9bf33767f09..81ab8e8c211 100644
--- a/build.gradle
+++ b/build.gradle
@@ -412,7 +412,7 @@ subprojects {
// connect tests
"**/ConnectorPluginsResourceTest.*",
"**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*",
- "**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*",
+ "**/KafkaConfigBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*",
"**/StandaloneHerderTest.*",
"**/SourceTaskOffsetCommitterTest.*",
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 780ca4e790e..4de13128dbc 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -58,33 +58,29 @@ import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Arrays;
import java.util.Set;
+import java.util.Collections;
+import java.util.Collection;
import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap;
@@ -103,18 +99,29 @@ import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
import static
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(ParameterizedTest.class)
-@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.doReturn;
+
+
+@RunWith(Parameterized.class)
public class ErrorHandlingTaskTest {
+ @Rule
+ public MockitoRule rule =
MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
private static final String TOPIC = "test";
private static final int PARTITION1 = 12;
private static final int PARTITION2 = 13;
private static final long FIRST_OFFSET = 45;
- @Mock Plugins plugins;
+ @Mock
+ Plugins plugins;
private static final Map<String, String> TASK_PROPS = new HashMap<>();
@@ -139,7 +146,6 @@ public class ErrorHandlingTaskTest {
@SuppressWarnings("unused")
@Mock
private SourceTask sourceTask;
- private Capture<WorkerSinkTaskContext> sinkTaskContext =
EasyMock.newCapture();
private WorkerConfig workerConfig;
private SourceConnectorConfig sourceConfig;
@Mock
@@ -164,8 +170,6 @@ public class ErrorHandlingTaskTest {
OffsetStorageWriter offsetWriter;
@Mock
private ConnectorOffsetBackingStore offsetStore;
-
- private Capture<ConsumerRebalanceListener> rebalanceListener =
EasyMock.newCapture();
@SuppressWarnings("unused")
@Mock
private TaskStatus.Listener statusListener;
@@ -179,7 +183,7 @@ public class ErrorHandlingTaskTest {
private boolean enableTopicCreation;
- @ParameterizedTest.Parameters
+ @Parameterized.Parameters
public static Collection<Boolean> parameters() {
return Arrays.asList(false, true);
}
@@ -197,7 +201,6 @@ public class ErrorHandlingTaskTest {
workerProps.put("value.converter",
"org.apache.kafka.connect.json.JsonConverter");
workerProps.put("offset.storage.file.filename",
"/tmp/connect.offsets");
workerProps.put(TOPIC_CREATION_ENABLE_CONFIG,
String.valueOf(enableTopicCreation));
- pluginLoader = PowerMock.createMock(PluginClassLoader.class);
workerConfig = new StandaloneConfig(workerProps);
sourceConfig = new SourceConnectorConfig(plugins,
sourceConnectorProps(TOPIC), true);
errorHandlingMetrics = new ErrorHandlingMetrics(taskId, metrics);
@@ -228,81 +231,58 @@ public class ErrorHandlingTaskTest {
@Test
public void testSinkTasksCloseErrorReporters() throws Exception {
- ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+ ErrorReporter reporter = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
-
- expectInitializeTask();
- reporter.close();
- EasyMock.expectLastCall();
- sinkTask.stop();
- EasyMock.expectLastCall();
-
- consumer.close();
- EasyMock.expectLastCall();
-
- headerConverter.close();
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
-
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
workerSinkTask.close();
-
- PowerMock.verifyAll();
+ // verify if invocation happened exactly 1 time
+ verifyInitializeSink();
+ verify(reporter).close();
+ verify(sinkTask).stop();
+ verify(consumer).close();
+ verify(headerConverter).close();
}
@Test
- public void testSourceTasksCloseErrorReporters() {
- ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+ public void testSourceTasksCloseErrorReporters() throws IOException {
+ ErrorReporter reporter = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator);
- expectClose();
-
- reporter.close();
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
-
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
-
- PowerMock.verifyAll();
+ verifyCloseSource();
+ verify(reporter).close();
}
@Test
- public void testCloseErrorReportersExceptionPropagation() {
- ErrorReporter reporterA = EasyMock.mock(ErrorReporter.class);
- ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
+ public void testCloseErrorReportersExceptionPropagation() throws
IOException {
+ ErrorReporter reporterA = mock(ErrorReporter.class);
+ ErrorReporter reporterB = mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
retryWithToleranceOperator.reporters(Arrays.asList(reporterA,
reporterB));
createSourceTask(initialState, retryWithToleranceOperator);
- expectClose();
-
// Even though the reporters throw exceptions, they should both still
be closed.
- reporterA.close();
- EasyMock.expectLastCall().andThrow(new RuntimeException());
-
- reporterB.close();
- EasyMock.expectLastCall().andThrow(new RuntimeException());
-
- PowerMock.replayAll();
+ doThrow(new RuntimeException()).when(reporterA).close();
+ doThrow(new RuntimeException()).when(reporterB).close();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.close();
- PowerMock.verifyAll();
+ verify(reporterA).close();
+ verify(reporterB).close();
+ verifyCloseSource();
}
@Test
@@ -316,21 +296,19 @@ public class ErrorHandlingTaskTest {
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
- expectInitializeTask();
- expectTaskGetTopic(true);
// valid json
- ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(TOPIC,
PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes());
+ ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(
+ TOPIC, PARTITION1, FIRST_OFFSET,
+ null, "{\"a\": 10}".getBytes());
// bad json
- ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC,
PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
+ ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(
+ TOPIC, PARTITION2, FIRST_OFFSET,
+ null, "{\"a\" 10}".getBytes());
-
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1));
-
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2));
-
- sinkTask.put(EasyMock.anyObject());
- EasyMock.expectLastCall().times(2);
-
- PowerMock.replayAll();
+ when(consumer.poll(any()))
+ .thenReturn(records(record1))
+ .thenReturn(records(record2));
workerSinkTask.initialize(TASK_CONFIG);
workerSinkTask.initializeAndStart();
@@ -338,6 +316,9 @@ public class ErrorHandlingTaskTest {
workerSinkTask.iteration();
+ verifyInitializeSink();
+ verify(sinkTask, times(2)).put(any());
+
// two records were consumed from Kafka
assertSinkMetricValue("sink-record-read-total", 2.0);
// only one was written to the task
@@ -348,12 +329,12 @@ public class ErrorHandlingTaskTest {
assertErrorHandlingMetricValue("total-record-failures", 3.0);
// one record completely failed (converter issues), and thus was
skipped
assertErrorHandlingMetricValue("total-records-skipped", 1.0);
-
- PowerMock.verifyAll();
}
private RetryWithToleranceOperator operator() {
- return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS,
OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM,
errorHandlingMetrics);
+ return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS,
+ OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE,
+ SYSTEM, errorHandlingMetrics);
}
@Test
@@ -374,30 +355,29 @@ public class ErrorHandlingTaskTest {
Struct struct2 = new Struct(valSchema).put("val", 6789);
SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC,
PARTITION1, valSchema, struct2);
- EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
- EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
- EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+ when(workerSourceTask.isStopping())
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenReturn(true);
- EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+ doReturn(true).when(workerSourceTask).commitOffsets();
- offsetStore.start();
- EasyMock.expectLastCall();
- sourceTask.initialize(EasyMock.anyObject());
- EasyMock.expectLastCall();
- sourceTask.start(EasyMock.anyObject());
- EasyMock.expectLastCall();
+ when(sourceTask.poll())
+ .thenReturn(singletonList(record1))
+ .thenReturn(singletonList(record2));
- EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
- EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
expectTopicCreation(TOPIC);
- EasyMock.expect(producer.send(EasyMock.anyObject(),
EasyMock.anyObject())).andReturn(null).times(2);
-
- PowerMock.replayAll();
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.initializeAndStart();
workerSourceTask.execute();
-
+ verify(workerSourceTask, times(3)).isStopping();
+ verify(workerSourceTask).commitOffsets();
+ verify(offsetStore).start();
+ verify(sourceTask).initialize(any());
+ verify(sourceTask).start(any());
+ verify(sourceTask, times(2)).poll();
+ verify(producer, times(2)).send(any(), any());
// two records were consumed from Kafka
assertSourceMetricValue("source-record-poll-total", 2.0);
// only one was written to the task
@@ -408,8 +388,6 @@ public class ErrorHandlingTaskTest {
assertErrorHandlingMetricValue("total-record-failures", 4.0);
// one record completely failed (converter issues), and thus was
skipped
assertErrorHandlingMetricValue("total-records-skipped", 0.0);
-
- PowerMock.verifyAll();
}
private ConnectorConfig connConfig(Map<String, String> connProps) {
@@ -438,30 +416,20 @@ public class ErrorHandlingTaskTest {
Struct struct2 = new Struct(valSchema).put("val", 6789);
SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC,
PARTITION1, valSchema, struct2);
- EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
- EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
- EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+ when(workerSourceTask.isStopping())
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenReturn(true);
- EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+ doReturn(true).when(workerSourceTask).commitOffsets();
- offsetStore.start();
- EasyMock.expectLastCall();
- sourceTask.initialize(EasyMock.anyObject());
- EasyMock.expectLastCall();
- sourceTask.start(EasyMock.anyObject());
- EasyMock.expectLastCall();
-
- EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
- EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+ when(sourceTask.poll())
+ .thenReturn(singletonList(record1))
+ .thenReturn(singletonList(record2));
expectTopicCreation(TOPIC);
- EasyMock.expect(producer.send(EasyMock.anyObject(),
EasyMock.anyObject())).andReturn(null).times(2);
-
- PowerMock.replayAll();
-
workerSourceTask.initialize(TASK_CONFIG);
workerSourceTask.initializeAndStart();
workerSourceTask.execute();
-
// two records were consumed from Kafka
assertSourceMetricValue("source-record-poll-total", 2.0);
// only one was written to the task
@@ -473,7 +441,13 @@ public class ErrorHandlingTaskTest {
// one record completely failed (converter issues), and thus was
skipped
assertErrorHandlingMetricValue("total-records-skipped", 0.0);
- PowerMock.verifyAll();
+ verify(workerSourceTask, times(3)).isStopping();
+ verify(workerSourceTask).commitOffsets();
+ verify(offsetStore).start();
+ verify(sourceTask).initialize(any());
+ verify(sourceTask).start(any());
+ verify(sourceTask, times(2)).poll();
+ verify(producer, times(2)).send(any(), any());
}
private void assertSinkMetricValue(String name, double expected) {
@@ -482,6 +456,13 @@ public class ErrorHandlingTaskTest {
assertEquals(expected, measured, 0.001d);
}
+ private void verifyInitializeSink() {
+ verify(sinkTask).start(TASK_PROPS);
+ verify(sinkTask).initialize(any(WorkerSinkTaskContext.class));
+ verify(consumer).subscribe(eq(singletonList(TOPIC)),
+ any(ConsumerRebalanceListener.class));
+ }
+
private void assertSourceMetricValue(String name, double expected) {
ConnectMetrics.MetricGroup sinkTaskGroup =
workerSourceTask.sourceTaskMetricsGroup().metricGroup();
double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup,
name);
@@ -494,73 +475,22 @@ public class ErrorHandlingTaskTest {
assertEquals(expected, measured, 0.001d);
}
- private void expectInitializeTask() {
- consumer.subscribe(EasyMock.eq(singletonList(TOPIC)),
EasyMock.capture(rebalanceListener));
- PowerMock.expectLastCall();
-
- sinkTask.initialize(EasyMock.capture(sinkTaskContext));
- PowerMock.expectLastCall();
- sinkTask.start(TASK_PROPS);
- PowerMock.expectLastCall();
- }
-
- private void expectTaskGetTopic(boolean anyTimes) {
- final Capture<String> connectorCapture = EasyMock.newCapture();
- final Capture<String> topicCapture = EasyMock.newCapture();
- IExpectationSetters<TopicStatus> expect =
EasyMock.expect(statusBackingStore.getTopic(
- EasyMock.capture(connectorCapture),
- EasyMock.capture(topicCapture)));
- if (anyTimes) {
- expect.andStubAnswer(() -> new TopicStatus(
- topicCapture.getValue(),
- new ConnectorTaskId(connectorCapture.getValue(), 0),
- Time.SYSTEM.milliseconds()));
- } else {
- expect.andAnswer(() -> new TopicStatus(
- topicCapture.getValue(),
- new ConnectorTaskId(connectorCapture.getValue(), 0),
- Time.SYSTEM.milliseconds()));
- }
- if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
- assertEquals("job", connectorCapture.getValue());
- assertEquals(TOPIC, topicCapture.getValue());
- }
- }
-
- private void expectClose() {
- producer.close(EasyMock.anyObject(Duration.class));
- EasyMock.expectLastCall();
-
- admin.close(EasyMock.anyObject(Duration.class));
- EasyMock.expectLastCall();
-
- offsetReader.close();
- EasyMock.expectLastCall();
-
- offsetStore.stop();
- EasyMock.expectLastCall();
-
- try {
- headerConverter.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- EasyMock.expectLastCall();
+ private void verifyCloseSource() throws IOException {
+ verify(producer).close(any(Duration.class));
+ verify(admin).close(any(Duration.class));
+ verify(offsetReader).close();
+ verify(offsetStore).stop();
+ // headerConverter.close() can throw IOException
+ verify(headerConverter).close();
}
private void expectTopicCreation(String topic) {
- if (workerConfig.topicCreationEnable()) {
-
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
- Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-
- if (enableTopicCreation) {
- Set<String> created = Collections.singleton(topic);
- Set<String> existing = Collections.emptySet();
- TopicAdmin.TopicCreationResponse response = new
TopicAdmin.TopicCreationResponse(created, existing);
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
- } else {
-
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true);
- }
+ if (enableTopicCreation) {
+
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
+ Set<String> created = Collections.singleton(topic);
+ Set<String> existing = Collections.emptySet();
+ TopicAdmin.TopicCreationResponse response = new
TopicAdmin.TopicCreationResponse(created, existing);
+
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(response);
}
}
@@ -571,13 +501,15 @@ public class ErrorHandlingTaskTest {
oo.put("schemas.enable", "false");
converter.configure(oo);
- TransformationChain<SinkRecord> sinkTransforms = new
TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()),
retryWithToleranceOperator);
+ TransformationChain<SinkRecord> sinkTransforms =
+ new TransformationChain<>(singletonList(new
FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
workerSinkTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig,
ClusterConfigState.EMPTY, metrics, converter, converter,
errorHandlingMetrics,
headerConverter, sinkTransforms, consumer, pluginLoader, time,
- retryWithToleranceOperator, workerErrantRecordReporter,
statusBackingStore);
+ retryWithToleranceOperator, workerErrantRecordReporter,
+ statusBackingStore);
}
private void createSourceTask(TargetState initialState,
RetryWithToleranceOperator retryWithToleranceOperator) {
@@ -602,13 +534,16 @@ public class ErrorHandlingTaskTest {
private void createSourceTask(TargetState initialState,
RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
TransformationChain<SourceRecord> sourceTransforms = new
TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()),
retryWithToleranceOperator);
- workerSourceTask = PowerMock.createPartialMock(
- WorkerSourceTask.class, new String[]{"commitOffsets",
"isStopping"},
- taskId, sourceTask, statusListener, initialState, converter,
converter, errorHandlingMetrics, headerConverter, sourceTransforms,
- producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
- offsetReader, offsetWriter, offsetStore, workerConfig,
- ClusterConfigState.EMPTY, metrics, pluginLoader, time,
retryWithToleranceOperator,
- statusBackingStore, (Executor) Runnable::run);
+ workerSourceTask = spy(new WorkerSourceTask(
+ taskId, sourceTask, statusListener, initialState, converter,
+ converter, errorHandlingMetrics, headerConverter,
+ sourceTransforms, producer, admin,
+ TopicCreationGroup.configuredGroups(sourceConfig),
+ offsetReader, offsetWriter, offsetStore, workerConfig,
+ ClusterConfigState.EMPTY, metrics, pluginLoader, time,
+ retryWithToleranceOperator,
+ statusBackingStore, (Executor) Runnable::run));
+
}
private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[],
byte[]> record) {