This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f137da04fa7 KAFKA-14132: Replace Easymock & Powermock with Mockito in 
KafkaBasedLogTest (#14153)
f137da04fa7 is described below

commit f137da04fa71734d176e19f5800622f4b4dfdb66
Author: bachmanity1 <[email protected]>
AuthorDate: Fri Aug 11 17:50:37 2023 +0900

    KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest 
(#14153)
    
    Reviewers: Yash Mayya <[email protected]>, Divij Vaidya 
<[email protected]>
---
 build.gradle                                       |   2 +-
 .../apache/kafka/connect/util/KafkaBasedLog.java   |   4 +-
 .../kafka/connect/util/KafkaBasedLogTest.java      | 233 +++++++--------------
 3 files changed, 75 insertions(+), 164 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7ac8f982d75..c4287726aea 100644
--- a/build.gradle
+++ b/build.gradle
@@ -416,7 +416,7 @@ subprojects {
     testsToExclude.addAll([
       // connect tests
       "**/KafkaConfigBackingStoreTest.*",
-      "**/KafkaBasedLogTest.*", "**/StandaloneHerderTest.*",
+      "**/StandaloneHerderTest.*",
       "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*"
     ])
   }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 046d84923ae..3a9014eda65 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -102,8 +102,8 @@ public class KafkaBasedLog<K, V> {
     private Consumer<K, V> consumer;
     private Optional<Producer<K, V>> producer;
     private TopicAdmin admin;
-
-    private Thread thread;
+    // Visible for testing
+    Thread thread;
     private boolean stopRequested;
     private final Queue<Callback<Void>> readLogEndOffsetCallbacks;
     private final java.util.function.Consumer<TopicAdmin> initializer;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index e1c7e6dd5db..40ad5ba5c62 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -36,21 +36,15 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.MockTime;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
+import org.apache.kafka.common.utils.Time;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.nio.ByteBuffer;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -64,6 +58,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
@@ -72,10 +67,14 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaBasedLog.class)
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class KafkaBasedLogTest {
 
     private static final String TOPIC = "connect-log";
@@ -114,29 +113,37 @@ public class KafkaBasedLogTest {
     private static final String TP0_VALUE_NEW = "VAL0_NEW";
     private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
-    private Time time = new MockTime();
+    private final Time time = new MockTime();
     private KafkaBasedLog<String, String> store;
 
     @Mock
-    private Runnable initializer;
+    private Consumer<TopicAdmin> initializer;
     @Mock
     private KafkaProducer<String, String> producer;
-    private MockConsumer<String, String> consumer;
-    @Mock
     private TopicAdmin admin;
+    private final Supplier<TopicAdmin> topicAdminSupplier = () -> admin;
+    private MockConsumer<String, String> consumer;
 
-    private Map<TopicPartition, List<ConsumerRecord<String, String>>> 
consumedRecords = new HashMap<>();
-    private Callback<ConsumerRecord<String, String>> consumedCallback = 
(error, record) -> {
+    private final Map<TopicPartition, List<ConsumerRecord<String, String>>> 
consumedRecords = new HashMap<>();
+    private final Callback<ConsumerRecord<String, String>> consumedCallback = 
(error, record) -> {
         TopicPartition partition = new TopicPartition(record.topic(), 
record.partition());
         List<ConsumerRecord<String, String>> records = 
consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>());
         records.add(record);
     };
 
-    @SuppressWarnings("unchecked")
     @Before
     public void setUp() {
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, 
initializer);
+        store = new KafkaBasedLog<String, String>(TOPIC, PRODUCER_PROPS, 
CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer) {
+            @Override
+            protected KafkaProducer<String, String> createProducer() {
+                return producer;
+            }
+
+            @Override
+            protected MockConsumer<String, String> createConsumer() {
+                return consumer;
+            }
+        };
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
         Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
@@ -146,12 +153,7 @@ public class KafkaBasedLogTest {
     }
 
     @Test
-    public void testStartStop() throws Exception {
-        expectStart();
-        expectStop();
-
-        PowerMock.replayAll();
-
+    public void testStartStop() {
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
@@ -160,19 +162,11 @@ public class KafkaBasedLogTest {
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
 
         store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        verifyStartAndStop();
     }
 
     @Test
     public void testReloadOnStart() throws Exception {
-        expectStart();
-        expectStop();
-
-        PowerMock.replayAll();
-
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 1L);
         endOffsets.put(TP1, 1L);
@@ -206,19 +200,11 @@ public class KafkaBasedLogTest {
         assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value());
 
         store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        verifyStartAndStop();
     }
 
     @Test
-    public void testReloadOnStartWithNoNewRecordsPresent() throws Exception {
-        expectStart();
-        expectStop();
-
-        PowerMock.replayAll();
-
+    public void testReloadOnStartWithNoNewRecordsPresent() {
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 7L);
         endOffsets.put(TP1, 7L);
@@ -241,30 +227,19 @@ public class KafkaBasedLogTest {
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        verifyStartAndStop();
     }
 
     @Test
     public void testSendAndReadToEnd() throws Exception {
-        expectStart();
         TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
         ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, 
TP0_KEY, TP0_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), 
EasyMock.capture(callback0))).andReturn(tp0Future);
+        ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = 
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+        when(producer.send(eq(tp0Record), 
callback0.capture())).thenReturn(tp0Future);
         TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
         ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, 
TP1_KEY, TP1_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp1Record), 
EasyMock.capture(callback1))).andReturn(tp1Future);
-
-        // Producer flushes when read to log end is called
-        producer.flush();
-        PowerMock.expectLastCall();
-
-        expectStop();
-
-        PowerMock.replayAll();
+        ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback1 = 
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+        when(producer.send(eq(tp1Record), 
callback1.capture())).thenReturn(tp1Future);
 
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
@@ -335,18 +310,13 @@ public class KafkaBasedLogTest {
         // Cleanup
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        // Producer flushes when read to log end is called
+        verify(producer).flush();
+        verifyStartAndStop();
     }
 
     @Test
     public void testPollConsumerError() throws Exception {
-        expectStart();
-        expectStop();
-
-        PowerMock.replayAll();
-
         final CountDownLatch finishedLatch = new CountDownLatch(1);
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 1L);
@@ -376,22 +346,11 @@ public class KafkaBasedLogTest {
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        verifyStartAndStop();
     }
 
     @Test
     public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
-        expectStart();
-
-        // Producer flushes when read to log end is called
-        producer.flush();
-        PowerMock.expectLastCall();
-
-        expectStop();
-
-        PowerMock.replayAll();
         final CountDownLatch finishedLatch = new CountDownLatch(1);
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
@@ -433,22 +392,17 @@ public class KafkaBasedLogTest {
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        // Producer flushes when read to log end is called
+        verify(producer).flush();
+        verifyStartAndStop();
     }
 
     @Test
-    public void testProducerError() throws Exception {
-        expectStart();
+    public void testProducerError() {
         TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
         ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, 
TP0_KEY, TP0_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), 
EasyMock.capture(callback0))).andReturn(tp0Future);
-
-        expectStop();
-
-        PowerMock.replayAll();
+        ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = 
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+        when(producer.send(eq(tp0Record), 
callback0.capture())).thenReturn(tp0Future);
 
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
@@ -471,42 +425,31 @@ public class KafkaBasedLogTest {
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        verifyStartAndStop();
     }
 
     @Test
-    public void testReadEndOffsetsUsingAdmin() throws Exception {
-        // Create a log that uses the admin supplier
-        setupWithAdmin();
-        expectProducerAndConsumerCreate();
-
+    public void testReadEndOffsetsUsingAdmin() {
         Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
-        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
-        admin.endOffsets(EasyMock.eq(tps));
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
-
-        PowerMock.replayAll();
+        admin = mock(TopicAdmin.class);
+        when(admin.retryEndOffsets(eq(tps), any(), 
anyLong())).thenReturn(endOffsets);
+        when(admin.endOffsets(eq(tps))).thenReturn(endOffsets);
 
         store.start();
         assertEquals(endOffsets, store.readEndOffsets(tps, false));
+        verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
+        verify(admin).endOffsets(eq(tps));
     }
 
     @Test
-    public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws 
Exception {
-        // Create a log that uses the admin supplier
-        setupWithAdmin();
-        expectProducerAndConsumerCreate();
-
+    public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() {
         Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+        admin = mock(TopicAdmin.class);
         // Getting end offsets using the admin client should fail with 
unsupported version
-        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
-        PowerMock.expectLastCall().andThrow(new 
UnsupportedVersionException("too old"));
+        when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenThrow(new 
UnsupportedVersionException("too old"));
 
         // Falls back to the consumer
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
@@ -514,65 +457,33 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP1, 0L);
         consumer.updateEndOffsets(endOffsets);
 
-        PowerMock.replayAll();
-
         store.start();
         assertEquals(endOffsets, store.readEndOffsets(tps, false));
+        verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
     }
 
     @Test
-    public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws 
Exception {
-        // Create a log that uses the admin supplier
-        setupWithAdmin();
-        expectProducerAndConsumerCreate();
-
+    public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() {
         Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
+        admin = mock(TopicAdmin.class);
         // Getting end offsets upon startup should work fine
-        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
+        when(admin.retryEndOffsets(eq(tps), any(), 
anyLong())).thenReturn(endOffsets);
         // Getting end offsets using the admin client should fail with leader 
not available
-        admin.endOffsets(EasyMock.eq(tps));
-        PowerMock.expectLastCall().andThrow(new 
LeaderNotAvailableException("retry"));
-
-        PowerMock.replayAll();
+        when(admin.endOffsets(eq(tps))).thenThrow(new 
LeaderNotAvailableException("retry"));
 
         store.start();
         assertThrows(LeaderNotAvailableException.class, () -> 
store.readEndOffsets(tps, false));
+        verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
+        verify(admin).endOffsets(eq(tps));
     }
 
-    @SuppressWarnings("unchecked")
-    private void setupWithAdmin() {
-        Supplier<TopicAdmin> adminSupplier = () -> admin;
-        java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
-    }
-
-    private void expectProducerAndConsumerCreate() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                 .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                 .andReturn(consumer);
-    }
-
-    private void expectStart() throws Exception {
-        initializer.run();
-        EasyMock.expectLastCall().times(1);
-
-        expectProducerAndConsumerCreate();
-    }
-
-    private void expectStop() {
-        producer.close();
-        PowerMock.expectLastCall();
-        // MockConsumer close is checked after test.
-    }
-
-    private static ByteBuffer buffer(String v) {
-        return ByteBuffer.wrap(v.getBytes());
+    private void verifyStartAndStop() {
+        verify(initializer).accept(admin);
+        verify(producer).close();
+        assertTrue(consumer.closed());
+        assertFalse(store.thread.isAlive());
     }
-
 }

Reply via email to