This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2dcf306ef83 KAFKA-14132: Replace EasyMock and PowerMock with Mockito 
in connect/runtime/ErrorHandlingTaskTest (#12735)
2dcf306ef83 is described below

commit 2dcf306ef839ada65ba021e0a9c0a046b754adf3
Author: Shekhar Rajak <[email protected]>
AuthorDate: Wed Dec 21 22:11:03 2022 +0530

    KAFKA-14132: Replace EasyMock and PowerMock with Mockito in 
connect/runtime/ErrorHandlingTaskTest (#12735)
    
    Reviewers: Divij Vaidya <[email protected]>, Chris Egerton 
<[email protected]>
---
 build.gradle                                       |   2 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     | 313 ++++++++-------------
 2 files changed, 125 insertions(+), 190 deletions(-)

diff --git a/build.gradle b/build.gradle
index 9bf33767f09..81ab8e8c211 100644
--- a/build.gradle
+++ b/build.gradle
@@ -412,7 +412,7 @@ subprojects {
       // connect tests
       "**/ConnectorPluginsResourceTest.*",
       "**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*",
-      "**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*",
+      "**/KafkaConfigBackingStoreTest.*",
       "**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", 
"**/StandaloneHerderTest.*",
       "**/SourceTaskOffsetCommitterTest.*",
       "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 780ca4e790e..4de13128dbc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -58,33 +58,29 @@ import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.connect.util.ParameterizedTest;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.apache.kafka.connect.util.TopicCreationGroup;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Arrays;
 import java.util.Set;
+import java.util.Collections;
+import java.util.Collection;
 import java.util.concurrent.Executor;
 
 import static java.util.Collections.emptyMap;
@@ -103,18 +99,29 @@ import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
 import static org.junit.Assert.assertEquals;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(ParameterizedTest.class)
-@PrepareForTest({WorkerSinkTask.class, WorkerSourceTask.class})
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.doReturn;
+
+
+@RunWith(Parameterized.class)
 public class ErrorHandlingTaskTest {
+    @Rule
+    public MockitoRule rule = 
MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
 
     private static final String TOPIC = "test";
     private static final int PARTITION1 = 12;
     private static final int PARTITION2 = 13;
     private static final long FIRST_OFFSET = 45;
 
-    @Mock Plugins plugins;
+    @Mock
+    Plugins plugins;
 
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
 
@@ -139,7 +146,6 @@ public class ErrorHandlingTaskTest {
     @SuppressWarnings("unused")
     @Mock
     private SourceTask sourceTask;
-    private Capture<WorkerSinkTaskContext> sinkTaskContext = 
EasyMock.newCapture();
     private WorkerConfig workerConfig;
     private SourceConnectorConfig sourceConfig;
     @Mock
@@ -164,8 +170,6 @@ public class ErrorHandlingTaskTest {
     OffsetStorageWriter offsetWriter;
     @Mock
     private ConnectorOffsetBackingStore offsetStore;
-
-    private Capture<ConsumerRebalanceListener> rebalanceListener = 
EasyMock.newCapture();
     @SuppressWarnings("unused")
     @Mock
     private TaskStatus.Listener statusListener;
@@ -179,7 +183,7 @@ public class ErrorHandlingTaskTest {
 
     private boolean enableTopicCreation;
 
-    @ParameterizedTest.Parameters
+    @Parameterized.Parameters
     public static Collection<Boolean> parameters() {
         return Arrays.asList(false, true);
     }
@@ -197,7 +201,6 @@ public class ErrorHandlingTaskTest {
         workerProps.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("offset.storage.file.filename", 
"/tmp/connect.offsets");
         workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, 
String.valueOf(enableTopicCreation));
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         workerConfig = new StandaloneConfig(workerProps);
         sourceConfig = new SourceConnectorConfig(plugins, 
sourceConnectorProps(TOPIC), true);
         errorHandlingMetrics = new ErrorHandlingMetrics(taskId, metrics);
@@ -228,81 +231,58 @@ public class ErrorHandlingTaskTest {
 
     @Test
     public void testSinkTasksCloseErrorReporters() throws Exception {
-        ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+        ErrorReporter reporter = mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.reporters(singletonList(reporter));
 
         createSinkTask(initialState, retryWithToleranceOperator);
-
-        expectInitializeTask();
-        reporter.close();
-        EasyMock.expectLastCall();
-        sinkTask.stop();
-        EasyMock.expectLastCall();
-
-        consumer.close();
-        EasyMock.expectLastCall();
-
-        headerConverter.close();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
         workerSinkTask.initialize(TASK_CONFIG);
         workerSinkTask.initializeAndStart();
         workerSinkTask.close();
-
-        PowerMock.verifyAll();
+        // verify if invocation happened exactly 1 time
+        verifyInitializeSink();
+        verify(reporter).close();
+        verify(sinkTask).stop();
+        verify(consumer).close();
+        verify(headerConverter).close();
     }
 
     @Test
-    public void testSourceTasksCloseErrorReporters() {
-        ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+    public void testSourceTasksCloseErrorReporters() throws IOException {
+        ErrorReporter reporter = mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.reporters(singletonList(reporter));
 
         createSourceTask(initialState, retryWithToleranceOperator);
 
-        expectClose();
-
-        reporter.close();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
         workerSourceTask.initialize(TASK_CONFIG);
         workerSourceTask.close();
-
-        PowerMock.verifyAll();
+        verifyCloseSource();
+        verify(reporter).close();
     }
 
     @Test
-    public void testCloseErrorReportersExceptionPropagation() {
-        ErrorReporter reporterA = EasyMock.mock(ErrorReporter.class);
-        ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
+    public void testCloseErrorReportersExceptionPropagation() throws 
IOException {
+        ErrorReporter reporterA = mock(ErrorReporter.class);
+        ErrorReporter reporterB = mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
         retryWithToleranceOperator.reporters(Arrays.asList(reporterA, 
reporterB));
 
         createSourceTask(initialState, retryWithToleranceOperator);
 
-        expectClose();
-
         // Even though the reporters throw exceptions, they should both still 
be closed.
-        reporterA.close();
-        EasyMock.expectLastCall().andThrow(new RuntimeException());
-
-        reporterB.close();
-        EasyMock.expectLastCall().andThrow(new RuntimeException());
-
-        PowerMock.replayAll();
+        doThrow(new RuntimeException()).when(reporterA).close();
+        doThrow(new RuntimeException()).when(reporterB).close();
 
         workerSourceTask.initialize(TASK_CONFIG);
         workerSourceTask.close();
 
-        PowerMock.verifyAll();
+        verify(reporterA).close();
+        verify(reporterB).close();
+        verifyCloseSource();
     }
 
     @Test
@@ -316,21 +296,19 @@ public class ErrorHandlingTaskTest {
         retryWithToleranceOperator.reporters(singletonList(reporter));
         createSinkTask(initialState, retryWithToleranceOperator);
 
-        expectInitializeTask();
-        expectTaskGetTopic(true);
 
         // valid json
-        ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(TOPIC, 
PARTITION1, FIRST_OFFSET, null, "{\"a\": 10}".getBytes());
+        ConsumerRecord<byte[], byte[]> record1 = new ConsumerRecord<>(
+                TOPIC, PARTITION1, FIRST_OFFSET,
+                null, "{\"a\": 10}".getBytes());
         // bad json
-        ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC, 
PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
+        ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(
+                TOPIC, PARTITION2, FIRST_OFFSET,
+                null, "{\"a\" 10}".getBytes());
 
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1));
-        
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2));
-
-        sinkTask.put(EasyMock.anyObject());
-        EasyMock.expectLastCall().times(2);
-
-        PowerMock.replayAll();
+        when(consumer.poll(any()))
+                .thenReturn(records(record1))
+                .thenReturn(records(record2));
 
         workerSinkTask.initialize(TASK_CONFIG);
         workerSinkTask.initializeAndStart();
@@ -338,6 +316,9 @@ public class ErrorHandlingTaskTest {
 
         workerSinkTask.iteration();
 
+        verifyInitializeSink();
+        verify(sinkTask, times(2)).put(any());
+
         // two records were consumed from Kafka
         assertSinkMetricValue("sink-record-read-total", 2.0);
         // only one was written to the task
@@ -348,12 +329,12 @@ public class ErrorHandlingTaskTest {
         assertErrorHandlingMetricValue("total-record-failures", 3.0);
         // one record completely failed (converter issues), and thus was 
skipped
         assertErrorHandlingMetricValue("total-records-skipped", 1.0);
-
-        PowerMock.verifyAll();
     }
 
     private RetryWithToleranceOperator operator() {
-        return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, 
OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM, 
errorHandlingMetrics);
+        return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS,
+                OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE,
+                SYSTEM, errorHandlingMetrics);
     }
 
     @Test
@@ -374,30 +355,29 @@ public class ErrorHandlingTaskTest {
         Struct struct2 = new Struct(valSchema).put("val", 6789);
         SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+        when(workerSourceTask.isStopping())
+                .thenReturn(false)
+                .thenReturn(false)
+                .thenReturn(true);
 
-        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+        doReturn(true).when(workerSourceTask).commitOffsets();
 
-        offsetStore.start();
-        EasyMock.expectLastCall();
-        sourceTask.initialize(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-        sourceTask.start(EasyMock.anyObject());
-        EasyMock.expectLastCall();
+        when(sourceTask.poll())
+                .thenReturn(singletonList(record1))
+                .thenReturn(singletonList(record2));
 
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
         expectTopicCreation(TOPIC);
-        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);
-
-        PowerMock.replayAll();
 
         workerSourceTask.initialize(TASK_CONFIG);
         workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
-
+        verify(workerSourceTask, times(3)).isStopping();
+        verify(workerSourceTask).commitOffsets();
+        verify(offsetStore).start();
+        verify(sourceTask).initialize(any());
+        verify(sourceTask).start(any());
+        verify(sourceTask, times(2)).poll();
+        verify(producer, times(2)).send(any(), any());
         // two records were consumed from Kafka
         assertSourceMetricValue("source-record-poll-total", 2.0);
         // only one was written to the task
@@ -408,8 +388,6 @@ public class ErrorHandlingTaskTest {
         assertErrorHandlingMetricValue("total-record-failures", 4.0);
         // one record completely failed (converter issues), and thus was 
skipped
         assertErrorHandlingMetricValue("total-records-skipped", 0.0);
-
-        PowerMock.verifyAll();
     }
 
     private ConnectorConfig connConfig(Map<String, String> connProps) {
@@ -438,30 +416,20 @@ public class ErrorHandlingTaskTest {
         Struct struct2 = new Struct(valSchema).put("val", 6789);
         SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-        EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+        when(workerSourceTask.isStopping())
+                .thenReturn(false)
+                .thenReturn(false)
+                .thenReturn(true);
 
-        EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+        doReturn(true).when(workerSourceTask).commitOffsets();
 
-        offsetStore.start();
-        EasyMock.expectLastCall();
-        sourceTask.initialize(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-        sourceTask.start(EasyMock.anyObject());
-        EasyMock.expectLastCall();
-
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-        EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
+        when(sourceTask.poll())
+                .thenReturn(singletonList(record1))
+                .thenReturn(singletonList(record2));
         expectTopicCreation(TOPIC);
-        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);
-
-        PowerMock.replayAll();
-
         workerSourceTask.initialize(TASK_CONFIG);
         workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
-
         // two records were consumed from Kafka
         assertSourceMetricValue("source-record-poll-total", 2.0);
         // only one was written to the task
@@ -473,7 +441,13 @@ public class ErrorHandlingTaskTest {
         // one record completely failed (converter issues), and thus was 
skipped
         assertErrorHandlingMetricValue("total-records-skipped", 0.0);
 
-        PowerMock.verifyAll();
+        verify(workerSourceTask, times(3)).isStopping();
+        verify(workerSourceTask).commitOffsets();
+        verify(offsetStore).start();
+        verify(sourceTask).initialize(any());
+        verify(sourceTask).start(any());
+        verify(sourceTask, times(2)).poll();
+        verify(producer, times(2)).send(any(), any());
     }
 
     private void assertSinkMetricValue(String name, double expected) {
@@ -482,6 +456,13 @@ public class ErrorHandlingTaskTest {
         assertEquals(expected, measured, 0.001d);
     }
 
+    private void verifyInitializeSink() {
+        verify(sinkTask).start(TASK_PROPS);
+        verify(sinkTask).initialize(any(WorkerSinkTaskContext.class));
+        verify(consumer).subscribe(eq(singletonList(TOPIC)),
+                any(ConsumerRebalanceListener.class));
+    }
+
     private void assertSourceMetricValue(String name, double expected) {
         ConnectMetrics.MetricGroup sinkTaskGroup = 
workerSourceTask.sourceTaskMetricsGroup().metricGroup();
         double measured = metrics.currentMetricValueAsDouble(sinkTaskGroup, 
name);
@@ -494,73 +475,22 @@ public class ErrorHandlingTaskTest {
         assertEquals(expected, measured, 0.001d);
     }
 
-    private void expectInitializeTask() {
-        consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
-    }
-
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
-    }
-
-    private void expectClose() {
-        producer.close(EasyMock.anyObject(Duration.class));
-        EasyMock.expectLastCall();
-
-        admin.close(EasyMock.anyObject(Duration.class));
-        EasyMock.expectLastCall();
-
-        offsetReader.close();
-        EasyMock.expectLastCall();
-
-        offsetStore.stop();
-        EasyMock.expectLastCall();
-
-        try {
-            headerConverter.close();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        EasyMock.expectLastCall();
+    private void verifyCloseSource() throws IOException {
+        verify(producer).close(any(Duration.class));
+        verify(admin).close(any(Duration.class));
+        verify(offsetReader).close();
+        verify(offsetStore).stop();
+        // headerConverter.close() can throw IOException
+        verify(headerConverter).close();
     }
 
     private void expectTopicCreation(String topic) {
-        if (workerConfig.topicCreationEnable()) {
-            
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-            Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-
-            if (enableTopicCreation) {
-                Set<String> created = Collections.singleton(topic);
-                Set<String> existing = Collections.emptySet();
-                TopicAdmin.TopicCreationResponse response = new 
TopicAdmin.TopicCreationResponse(created, existing);
-                
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
-            } else {
-                
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true);
-            }
+        if (enableTopicCreation) {
+            
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
+            Set<String> created = Collections.singleton(topic);
+            Set<String> existing = Collections.emptySet();
+            TopicAdmin.TopicCreationResponse response = new 
TopicAdmin.TopicCreationResponse(created, existing);
+            
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(response);
         }
     }
 
@@ -571,13 +501,15 @@ public class ErrorHandlingTaskTest {
         oo.put("schemas.enable", "false");
         converter.configure(oo);
 
-        TransformationChain<SinkRecord> sinkTransforms = new 
TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), 
retryWithToleranceOperator);
+        TransformationChain<SinkRecord> sinkTransforms =
+                new TransformationChain<>(singletonList(new 
FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
 
         workerSinkTask = new WorkerSinkTask(
             taskId, sinkTask, statusListener, initialState, workerConfig,
             ClusterConfigState.EMPTY, metrics, converter, converter, 
errorHandlingMetrics,
             headerConverter, sinkTransforms, consumer, pluginLoader, time,
-            retryWithToleranceOperator, workerErrantRecordReporter, 
statusBackingStore);
+            retryWithToleranceOperator, workerErrantRecordReporter,
+                statusBackingStore);
     }
 
     private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator) {
@@ -602,13 +534,16 @@ public class ErrorHandlingTaskTest {
     private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
         TransformationChain<SourceRecord> sourceTransforms = new 
TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), 
retryWithToleranceOperator);
 
-        workerSourceTask = PowerMock.createPartialMock(
-            WorkerSourceTask.class, new String[]{"commitOffsets", 
"isStopping"},
-            taskId, sourceTask, statusListener, initialState, converter, 
converter, errorHandlingMetrics, headerConverter, sourceTransforms,
-            producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
-            offsetReader, offsetWriter, offsetStore, workerConfig,
-            ClusterConfigState.EMPTY, metrics, pluginLoader, time, 
retryWithToleranceOperator,
-            statusBackingStore, (Executor) Runnable::run);
+        workerSourceTask = spy(new WorkerSourceTask(
+            taskId, sourceTask, statusListener, initialState, converter,
+                converter, errorHandlingMetrics, headerConverter,
+                sourceTransforms, producer, admin,
+                TopicCreationGroup.configuredGroups(sourceConfig),
+                offsetReader, offsetWriter, offsetStore, workerConfig,
+                ClusterConfigState.EMPTY, metrics, pluginLoader, time,
+                retryWithToleranceOperator,
+                statusBackingStore, (Executor) Runnable::run));
+
     }
 
     private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], 
byte[]> record) {

Reply via email to