This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f137da04fa7 KAFKA-14132: Replace Easymock & Powermock with Mockito in
KafkaBasedLogTest (#14153)
f137da04fa7 is described below
commit f137da04fa71734d176e19f5800622f4b4dfdb66
Author: bachmanity1 <[email protected]>
AuthorDate: Fri Aug 11 17:50:37 2023 +0900
KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest
(#14153)
Reviewers: Yash Mayya <[email protected]>, Divij Vaidya
<[email protected]>
---
build.gradle | 2 +-
.../apache/kafka/connect/util/KafkaBasedLog.java | 4 +-
.../kafka/connect/util/KafkaBasedLogTest.java | 233 +++++++--------------
3 files changed, 75 insertions(+), 164 deletions(-)
diff --git a/build.gradle b/build.gradle
index 7ac8f982d75..c4287726aea 100644
--- a/build.gradle
+++ b/build.gradle
@@ -416,7 +416,7 @@ subprojects {
testsToExclude.addAll([
// connect tests
"**/KafkaConfigBackingStoreTest.*",
- "**/KafkaBasedLogTest.*", "**/StandaloneHerderTest.*",
+ "**/StandaloneHerderTest.*",
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*"
])
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 046d84923ae..3a9014eda65 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -102,8 +102,8 @@ public class KafkaBasedLog<K, V> {
private Consumer<K, V> consumer;
private Optional<Producer<K, V>> producer;
private TopicAdmin admin;
-
- private Thread thread;
+ // Visible for testing
+ Thread thread;
private boolean stopRequested;
private final Queue<Callback<Void>> readLogEndOffsetCallbacks;
private final java.util.function.Consumer<TopicAdmin> initializer;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index e1c7e6dd5db..40ad5ba5c62 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -36,21 +36,15 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.MockTime;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
+import org.apache.kafka.common.utils.Time;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.nio.ByteBuffer;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -64,6 +58,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
@@ -72,10 +67,14 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaBasedLog.class)
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KafkaBasedLogTest {
private static final String TOPIC = "connect-log";
@@ -114,29 +113,37 @@ public class KafkaBasedLogTest {
private static final String TP0_VALUE_NEW = "VAL0_NEW";
private static final String TP1_VALUE_NEW = "VAL1_NEW";
- private Time time = new MockTime();
+ private final Time time = new MockTime();
private KafkaBasedLog<String, String> store;
@Mock
- private Runnable initializer;
+ private Consumer<TopicAdmin> initializer;
@Mock
private KafkaProducer<String, String> producer;
- private MockConsumer<String, String> consumer;
- @Mock
private TopicAdmin admin;
+ private final Supplier<TopicAdmin> topicAdminSupplier = () -> admin;
+ private MockConsumer<String, String> consumer;
- private Map<TopicPartition, List<ConsumerRecord<String, String>>>
consumedRecords = new HashMap<>();
- private Callback<ConsumerRecord<String, String>> consumedCallback =
(error, record) -> {
+ private final Map<TopicPartition, List<ConsumerRecord<String, String>>>
consumedRecords = new HashMap<>();
+ private final Callback<ConsumerRecord<String, String>> consumedCallback =
(error, record) -> {
TopicPartition partition = new TopicPartition(record.topic(),
record.partition());
List<ConsumerRecord<String, String>> records =
consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>());
records.add(record);
};
- @SuppressWarnings("unchecked")
@Before
public void setUp() {
- store = PowerMock.createPartialMock(KafkaBasedLog.class, new
String[]{"createConsumer", "createProducer"},
- TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time,
initializer);
+ store = new KafkaBasedLog<String, String>(TOPIC, PRODUCER_PROPS,
CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer) {
+ @Override
+ protected KafkaProducer<String, String> createProducer() {
+ return producer;
+ }
+
+ @Override
+ protected MockConsumer<String, String> createConsumer() {
+ return consumer;
+ }
+ };
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
@@ -146,12 +153,7 @@ public class KafkaBasedLogTest {
}
@Test
- public void testStartStop() throws Exception {
- expectStart();
- expectStop();
-
- PowerMock.replayAll();
-
+ public void testStartStop() {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
@@ -160,19 +162,11 @@ public class KafkaBasedLogTest {
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
store.stop();
-
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
- assertTrue(consumer.closed());
- PowerMock.verifyAll();
+ verifyStartAndStop();
}
@Test
public void testReloadOnStart() throws Exception {
- expectStart();
- expectStop();
-
- PowerMock.replayAll();
-
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 1L);
endOffsets.put(TP1, 1L);
@@ -206,19 +200,11 @@ public class KafkaBasedLogTest {
assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value());
store.stop();
-
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
- assertTrue(consumer.closed());
- PowerMock.verifyAll();
+ verifyStartAndStop();
}
@Test
- public void testReloadOnStartWithNoNewRecordsPresent() throws Exception {
- expectStart();
- expectStop();
-
- PowerMock.replayAll();
-
+ public void testReloadOnStartWithNoNewRecordsPresent() {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 7L);
endOffsets.put(TP1, 7L);
@@ -241,30 +227,19 @@ public class KafkaBasedLogTest {
store.stop();
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
- assertTrue(consumer.closed());
- PowerMock.verifyAll();
+ verifyStartAndStop();
}
@Test
public void testSendAndReadToEnd() throws Exception {
- expectStart();
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC,
TP0_KEY, TP0_VALUE);
- Capture<org.apache.kafka.clients.producer.Callback> callback0 =
EasyMock.newCapture();
- EasyMock.expect(producer.send(EasyMock.eq(tp0Record),
EasyMock.capture(callback0))).andReturn(tp0Future);
+ ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 =
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+ when(producer.send(eq(tp0Record),
callback0.capture())).thenReturn(tp0Future);
TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC,
TP1_KEY, TP1_VALUE);
- Capture<org.apache.kafka.clients.producer.Callback> callback1 =
EasyMock.newCapture();
- EasyMock.expect(producer.send(EasyMock.eq(tp1Record),
EasyMock.capture(callback1))).andReturn(tp1Future);
-
- // Producer flushes when read to log end is called
- producer.flush();
- PowerMock.expectLastCall();
-
- expectStop();
-
- PowerMock.replayAll();
+ ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback1 =
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+ when(producer.send(eq(tp1Record),
callback1.capture())).thenReturn(tp1Future);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
@@ -335,18 +310,13 @@ public class KafkaBasedLogTest {
// Cleanup
store.stop();
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
- assertTrue(consumer.closed());
- PowerMock.verifyAll();
+ // Producer flushes when read to log end is called
+ verify(producer).flush();
+ verifyStartAndStop();
}
@Test
public void testPollConsumerError() throws Exception {
- expectStart();
- expectStop();
-
- PowerMock.replayAll();
-
final CountDownLatch finishedLatch = new CountDownLatch(1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 1L);
@@ -376,22 +346,11 @@ public class KafkaBasedLogTest {
store.stop();
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
- assertTrue(consumer.closed());
- PowerMock.verifyAll();
+ verifyStartAndStop();
}
@Test
public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
- expectStart();
-
- // Producer flushes when read to log end is called
- producer.flush();
- PowerMock.expectLastCall();
-
- expectStop();
-
- PowerMock.replayAll();
final CountDownLatch finishedLatch = new CountDownLatch(1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
@@ -433,22 +392,17 @@ public class KafkaBasedLogTest {
store.stop();
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
- assertTrue(consumer.closed());
- PowerMock.verifyAll();
+ // Producer flushes when read to log end is called
+ verify(producer).flush();
+ verifyStartAndStop();
}
@Test
- public void testProducerError() throws Exception {
- expectStart();
+ public void testProducerError() {
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC,
TP0_KEY, TP0_VALUE);
- Capture<org.apache.kafka.clients.producer.Callback> callback0 =
EasyMock.newCapture();
- EasyMock.expect(producer.send(EasyMock.eq(tp0Record),
EasyMock.capture(callback0))).andReturn(tp0Future);
-
- expectStop();
-
- PowerMock.replayAll();
+ ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 =
ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
+ when(producer.send(eq(tp0Record),
callback0.capture())).thenReturn(tp0Future);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
@@ -471,42 +425,31 @@ public class KafkaBasedLogTest {
store.stop();
- assertFalse(Whitebox.<Thread>getInternalState(store,
"thread").isAlive());
- assertTrue(consumer.closed());
- PowerMock.verifyAll();
+ verifyStartAndStop();
}
@Test
- public void testReadEndOffsetsUsingAdmin() throws Exception {
- // Create a log that uses the admin supplier
- setupWithAdmin();
- expectProducerAndConsumerCreate();
-
+ public void testReadEndOffsetsUsingAdmin() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
- admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(),
EasyMock.anyLong());
- PowerMock.expectLastCall().andReturn(endOffsets).times(1);
- admin.endOffsets(EasyMock.eq(tps));
- PowerMock.expectLastCall().andReturn(endOffsets).times(1);
-
- PowerMock.replayAll();
+ admin = mock(TopicAdmin.class);
+ when(admin.retryEndOffsets(eq(tps), any(),
anyLong())).thenReturn(endOffsets);
+ when(admin.endOffsets(eq(tps))).thenReturn(endOffsets);
store.start();
assertEquals(endOffsets, store.readEndOffsets(tps, false));
+ verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
+ verify(admin).endOffsets(eq(tps));
}
@Test
- public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws
Exception {
- // Create a log that uses the admin supplier
- setupWithAdmin();
- expectProducerAndConsumerCreate();
-
+ public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
+ admin = mock(TopicAdmin.class);
// Getting end offsets using the admin client should fail with
unsupported version
- admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(),
EasyMock.anyLong());
- PowerMock.expectLastCall().andThrow(new
UnsupportedVersionException("too old"));
+ when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenThrow(new
UnsupportedVersionException("too old"));
// Falls back to the consumer
Map<TopicPartition, Long> endOffsets = new HashMap<>();
@@ -514,65 +457,33 @@ public class KafkaBasedLogTest {
endOffsets.put(TP1, 0L);
consumer.updateEndOffsets(endOffsets);
- PowerMock.replayAll();
-
store.start();
assertEquals(endOffsets, store.readEndOffsets(tps, false));
+ verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
}
@Test
- public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws
Exception {
- // Create a log that uses the admin supplier
- setupWithAdmin();
- expectProducerAndConsumerCreate();
-
+ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
+ admin = mock(TopicAdmin.class);
// Getting end offsets upon startup should work fine
- admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(),
EasyMock.anyLong());
- PowerMock.expectLastCall().andReturn(endOffsets).times(1);
+ when(admin.retryEndOffsets(eq(tps), any(),
anyLong())).thenReturn(endOffsets);
// Getting end offsets using the admin client should fail with leader
not available
- admin.endOffsets(EasyMock.eq(tps));
- PowerMock.expectLastCall().andThrow(new
LeaderNotAvailableException("retry"));
-
- PowerMock.replayAll();
+ when(admin.endOffsets(eq(tps))).thenThrow(new
LeaderNotAvailableException("retry"));
store.start();
assertThrows(LeaderNotAvailableException.class, () ->
store.readEndOffsets(tps, false));
+ verify(admin).retryEndOffsets(eq(tps), any(), anyLong());
+ verify(admin).endOffsets(eq(tps));
}
- @SuppressWarnings("unchecked")
- private void setupWithAdmin() {
- Supplier<TopicAdmin> adminSupplier = () -> admin;
- java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
- store = PowerMock.createPartialMock(KafkaBasedLog.class, new
String[]{"createConsumer", "createProducer"},
- TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier,
consumedCallback, time, initializer);
- }
-
- private void expectProducerAndConsumerCreate() throws Exception {
- PowerMock.expectPrivate(store, "createProducer")
- .andReturn(producer);
- PowerMock.expectPrivate(store, "createConsumer")
- .andReturn(consumer);
- }
-
- private void expectStart() throws Exception {
- initializer.run();
- EasyMock.expectLastCall().times(1);
-
- expectProducerAndConsumerCreate();
- }
-
- private void expectStop() {
- producer.close();
- PowerMock.expectLastCall();
- // MockConsumer close is checked after test.
- }
-
- private static ByteBuffer buffer(String v) {
- return ByteBuffer.wrap(v.getBytes());
+ private void verifyStartAndStop() {
+ verify(initializer).accept(admin);
+ verify(producer).close();
+ assertTrue(consumer.closed());
+ assertFalse(store.thread.isAlive());
}
-
}