yashmayya commented on code in PR #14153:
URL: https://github.com/apache/kafka/pull/14153#discussion_r1286889309


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -64,18 +58,23 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaBasedLog.class)
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)

Review Comment:
   We should also remove this test from the list here so that it isn't skipped 
on Java 16 and newer - 
https://github.com/apache/kafka/blob/8dec3e66163420ee0c2c259eef6e0c0f3185ca17/build.gradle#L413-L423
 



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -114,29 +113,29 @@ public class KafkaBasedLogTest {
     private static final String TP0_VALUE_NEW = "VAL0_NEW";
     private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
-    private Time time = new MockTime();
+    private final Time time = new MockTime();
     private KafkaBasedLog<String, String> store;
 
     @Mock
-    private Runnable initializer;
+    private Consumer<TopicAdmin> initializer;
     @Mock
     private KafkaProducer<String, String> producer;
     private MockConsumer<String, String> consumer;
     @Mock
     private TopicAdmin admin;
+    @Mock
+    private Supplier<TopicAdmin> topicAdminSupplier;
 
-    private Map<TopicPartition, List<ConsumerRecord<String, String>>> 
consumedRecords = new HashMap<>();
-    private Callback<ConsumerRecord<String, String>> consumedCallback = 
(error, record) -> {
+    private final Map<TopicPartition, List<ConsumerRecord<String, String>>> 
consumedRecords = new HashMap<>();
+    private final Callback<ConsumerRecord<String, String>> consumedCallback = 
(error, record) -> {
         TopicPartition partition = new TopicPartition(record.topic(), 
record.partition());
         List<ConsumerRecord<String, String>> records = 
consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>());
         records.add(record);
     };
 
-    @SuppressWarnings("unchecked")
     @Before
     public void setUp() {
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, 
initializer);
+        store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
topicAdminSupplier, consumedCallback, time, initializer));

Review Comment:
   It looks like the only reason we need a partial mock here is to stub in the 
mock producer / consumer. Since the `KafkaBasedLog::createProducer` and 
`KafkaBasedLog::createConsumer` methods are now `protected` (looks like they 
were `private` when this test was originally written), could we just override 
those methods to return the mock clients and use a regular instance instead of 
using a spy / partial mock? I'd prefer it if we could avoid the use of spies / 
partial mocks as far as possible, since they aren't very OOP-y.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -160,18 +156,12 @@ public void testStartStop() throws Exception {
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
 
         store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());

Review Comment:
   Aren't we losing this test coverage? Can we bump up the visibility of the 
`thread` instance variable to `package-private` (with a comment stating that it 
is being made visible for testing) and retain this coverage?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -486,12 +440,8 @@ public void testReadEndOffsetsUsingAdmin() throws 
Exception {
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
-        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
-        admin.endOffsets(EasyMock.eq(tps));
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
-
-        PowerMock.replayAll();
+        when(admin.retryEndOffsets(eq(tps), any(), 
anyLong())).thenReturn(endOffsets);
+        when(admin.endOffsets(eq(tps))).thenReturn(endOffsets);

Review Comment:
   We're losing out on the existing coverage here since we're no longer 
ensuring that these methods are called one time only. Can we add an explicit 
verification for this at the end of the test?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -531,48 +478,28 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
         // Getting end offsets upon startup should work fine
-        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
+        when(admin.retryEndOffsets(eq(tps), any(), 
anyLong())).thenReturn(endOffsets);
         // Getting end offsets using the admin client should fail with leader 
not available
-        admin.endOffsets(EasyMock.eq(tps));
-        PowerMock.expectLastCall().andThrow(new 
LeaderNotAvailableException("retry"));
-
-        PowerMock.replayAll();
+        when(admin.endOffsets(eq(tps))).thenThrow(new 
LeaderNotAvailableException("retry"));
 
         store.start();
         assertThrows(LeaderNotAvailableException.class, () -> 
store.readEndOffsets(tps, false));
     }
 
-    @SuppressWarnings("unchecked")
     private void setupWithAdmin() {
         Supplier<TopicAdmin> adminSupplier = () -> admin;
         java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
-    }
-
-    private void expectProducerAndConsumerCreate() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                 .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                 .andReturn(consumer);
-    }
-
-    private void expectStart() throws Exception {
-        initializer.run();
-        EasyMock.expectLastCall().times(1);
-
-        expectProducerAndConsumerCreate();
+        store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer));

Review Comment:
   Do we still need this separate `setupWithAdmin` method now that we're 
setting up the `KafkaBasedLog` store with a topic admin supplier even in the 
regular `setUp` method?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -241,30 +225,20 @@ public void testReloadOnStartWithNoNewRecordsPresent() 
throws Exception {
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, 
"thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
+        verifyStartAndStop();
     }
 
     @Test
     public void testSendAndReadToEnd() throws Exception {
-        expectStart();
+        expectProducerAndConsumerCreate();
         TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
         ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, 
TP0_KEY, TP0_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), 
EasyMock.capture(callback0))).andReturn(tp0Future);
+        ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = 
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+        when(producer.send(eq(tp0Record), 
callback0.capture())).thenReturn(tp0Future);
         TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
         ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, 
TP1_KEY, TP1_VALUE);
-        Capture<org.apache.kafka.clients.producer.Callback> callback1 = 
EasyMock.newCapture();
-        EasyMock.expect(producer.send(EasyMock.eq(tp1Record), 
EasyMock.capture(callback1))).andReturn(tp1Future);
-
-        // Producer flushes when read to log end is called

Review Comment:
   Can we retain this clarifying comment (we can add it above the producer 
flush verification)?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##########
@@ -531,48 +478,28 @@ public void 
testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
         // Getting end offsets upon startup should work fine
-        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
-        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
+        when(admin.retryEndOffsets(eq(tps), any(), 
anyLong())).thenReturn(endOffsets);
         // Getting end offsets using the admin client should fail with leader 
not available
-        admin.endOffsets(EasyMock.eq(tps));
-        PowerMock.expectLastCall().andThrow(new 
LeaderNotAvailableException("retry"));
-
-        PowerMock.replayAll();
+        when(admin.endOffsets(eq(tps))).thenThrow(new 
LeaderNotAvailableException("retry"));
 
         store.start();
         assertThrows(LeaderNotAvailableException.class, () -> 
store.readEndOffsets(tps, false));
     }
 
-    @SuppressWarnings("unchecked")
     private void setupWithAdmin() {
         Supplier<TopicAdmin> adminSupplier = () -> admin;
         java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
-        store = PowerMock.createPartialMock(KafkaBasedLog.class, new 
String[]{"createConsumer", "createProducer"},
-                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, 
consumedCallback, time, initializer);
-    }
-
-    private void expectProducerAndConsumerCreate() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                 .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                 .andReturn(consumer);
-    }
-
-    private void expectStart() throws Exception {
-        initializer.run();
-        EasyMock.expectLastCall().times(1);
-
-        expectProducerAndConsumerCreate();
+        store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, 
adminSupplier, consumedCallback, time, initializer));
     }
 
-    private void expectStop() {
-        producer.close();
-        PowerMock.expectLastCall();
-        // MockConsumer close is checked after test.
+    private void expectProducerAndConsumerCreate() {
+        doReturn(producer).when(store).createProducer();
+        doReturn(consumer).when(store).createConsumer();
     }
 
-    private static ByteBuffer buffer(String v) {
-        return ByteBuffer.wrap(v.getBytes());
+    private void verifyStartAndStop() {
+        verify(initializer).accept(any());

Review Comment:
   Now that we're moving to the newer non-deprecated `KafkaBasedLog` 
constructor in this test class, could we enhance this verification to also 
verify that the initializer is called with the admin client from the supplier - 
https://github.com/apache/kafka/blob/8dec3e66163420ee0c2c259eef6e0c0f3185ca17/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L232-L240



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to