yashmayya commented on code in PR #14153: URL: https://github.com/apache/kafka/pull/14153#discussion_r1286889309
########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -64,18 +58,23 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaBasedLog.class) -@PowerMockIgnore("javax.management.*") +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) Review Comment: We should also remove this test from the list here so that it isn't skipped on Java 16 and newer - https://github.com/apache/kafka/blob/8dec3e66163420ee0c2c259eef6e0c0f3185ca17/build.gradle#L413-L423 ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -114,29 +113,29 @@ public class KafkaBasedLogTest { private static final String TP0_VALUE_NEW = "VAL0_NEW"; private static final String TP1_VALUE_NEW = "VAL1_NEW"; - private Time time = new MockTime(); + private final Time time = new MockTime(); private KafkaBasedLog<String, String> store; @Mock - private Runnable initializer; + private Consumer<TopicAdmin> initializer; @Mock private KafkaProducer<String, String> producer; private MockConsumer<String, String> consumer; @Mock private TopicAdmin admin; + @Mock + private Supplier<TopicAdmin> topicAdminSupplier; - private Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>(); - private Callback<ConsumerRecord<String, String>> consumedCallback = (error, record) -> { + private final Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>(); + private final Callback<ConsumerRecord<String, String>> consumedCallback = (error, record) -> { TopicPartition partition = new TopicPartition(record.topic(), record.partition()); List<ConsumerRecord<String, String>> records = consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>()); records.add(record); }; - @SuppressWarnings("unchecked") @Before public void setUp() { - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); + store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer)); Review Comment: It looks like the only reason we need a partial mock here is to stub in the mock producer / consumer. Since the `KafkaBasedLog::createProducer` and `KafkaBasedLog::createConsumer` methods are now `protected` (looks like they were `private` when this test was originally written), could we just override those methods to return the mock clients and use a regular instance instead of using a spy / partial mock? I'd prefer it if we could avoid the use of spies / partial mocks as far as possible, since they aren't very OOP-y. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -160,18 +156,12 @@ public void testStartStop() throws Exception { assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); store.stop(); - - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); Review Comment: Aren't we losing this test coverage? Can we bump up the visibility of the `thread` instance variable to `package-private` (with a comment stating that it is being made visible for testing) and retain this coverage? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -486,12 +440,8 @@ public void testReadEndOffsetsUsingAdmin() throws Exception { Map<TopicPartition, Long> endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); - admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); - - PowerMock.replayAll(); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); + when(admin.endOffsets(eq(tps))).thenReturn(endOffsets); Review Comment: We're losing out on the existing coverage here since we're no longer ensuring that these methods are called one time only. Can we add an explicit verification for this at the end of the test? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -531,48 +478,28 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); // Getting end offsets upon startup should work fine - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); // Getting end offsets using the admin client should fail with leader not available - admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andThrow(new LeaderNotAvailableException("retry")); - - PowerMock.replayAll(); + when(admin.endOffsets(eq(tps))).thenThrow(new LeaderNotAvailableException("retry")); store.start(); assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false)); } - @SuppressWarnings("unchecked") private void setupWithAdmin() { Supplier<TopicAdmin> adminSupplier = () -> admin; java.util.function.Consumer<TopicAdmin> initializer = admin -> { }; - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); - } - - private void expectProducerAndConsumerCreate() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); - } - - private void expectStart() throws Exception { - initializer.run(); - EasyMock.expectLastCall().times(1); - - expectProducerAndConsumerCreate(); + store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer)); Review Comment: Do we still need this separate `setupWithAdmin` method now that we're setting up the `KafkaBasedLog` store with a topic admin supplier even in the regular `setUp` method? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -241,30 +225,20 @@ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { store.stop(); - assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); - PowerMock.verifyAll(); + verifyStartAndStop(); } @Test public void testSendAndReadToEnd() throws Exception { - expectStart(); + expectProducerAndConsumerCreate(); TestFuture<RecordMetadata> tp0Future = new TestFuture<>(); ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); - Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); + ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future); TestFuture<RecordMetadata> tp1Future = new TestFuture<>(); ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE); - Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future); - - // Producer flushes when read to log end is called Review Comment: Can we retain this clarifying comment (we can add it above the producer flush verification)? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java: ########## @@ -531,48 +478,28 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); // Getting end offsets upon startup should work fine - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); // Getting end offsets using the admin client should fail with leader not available - admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andThrow(new LeaderNotAvailableException("retry")); - - PowerMock.replayAll(); + when(admin.endOffsets(eq(tps))).thenThrow(new LeaderNotAvailableException("retry")); store.start(); assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false)); } - @SuppressWarnings("unchecked") private void setupWithAdmin() { Supplier<TopicAdmin> adminSupplier = () -> admin; java.util.function.Consumer<TopicAdmin> initializer = admin -> { }; - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); - } - - private void expectProducerAndConsumerCreate() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); - } - - private void expectStart() throws Exception { - initializer.run(); - EasyMock.expectLastCall().times(1); - - expectProducerAndConsumerCreate(); + store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer)); } - private void expectStop() { - producer.close(); - PowerMock.expectLastCall(); - // MockConsumer close is checked after test. + private void expectProducerAndConsumerCreate() { + doReturn(producer).when(store).createProducer(); + doReturn(consumer).when(store).createConsumer(); } - private static ByteBuffer buffer(String v) { - return ByteBuffer.wrap(v.getBytes()); + private void verifyStartAndStop() { + verify(initializer).accept(any()); Review Comment: Now that we're moving to the newer non-deprecated `KafkaBasedLog` constructor in this test class, could we enhance this verification to also verify that the initializer is called with the admin client from the supplier - https://github.com/apache/kafka/blob/8dec3e66163420ee0c2c259eef6e0c0f3185ca17/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L232-L240 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org