This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1a15cd708ab KAFKA-14133: Migrato SessionCacheFlushListenerTest,
TimestampedCacheFlushListenerTest and TimestampedTupleForwarderTest to Mockito
(#14205)
1a15cd708ab is described below
commit 1a15cd708ab252e7c75dea06f36db36599f18322
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Aug 16 08:46:40 2023 +0100
KAFKA-14133: Migrato SessionCacheFlushListenerTest,
TimestampedCacheFlushListenerTest and TimestampedTupleForwarderTest to Mockito
(#14205)
Reviewers: Divij Vaidya <[email protected]>
---
.../internals/SessionCacheFlushListenerTest.java | 22 ++++++------
.../TimestampedCacheFlushListenerTest.java | 32 +++++++----------
.../internals/TimestampedTupleForwarderTest.java | 41 ++++++++++------------
3 files changed, 42 insertions(+), 53 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index 675f248083e..87cf2576b5e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -20,27 +20,25 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class SessionCacheFlushListenerTest {
@Test
public void shouldForwardKeyNewValueOldValueAndTimestamp() {
+ @SuppressWarnings("unchecked")
final InternalProcessorContext<Windowed<String>, Change<String>>
context = mock(InternalProcessorContext.class);
- expect(context.currentNode()).andReturn(null).anyTimes();
- context.setCurrentNode(null);
- context.setCurrentNode(null);
- context.forward(
+ doNothing().when(context).forward(
new Record<>(
new Windowed<>("key", new SessionWindow(21L, 73L)),
new Change<>("newValue", "oldValue"),
73L));
- expectLastCall();
- replay(context);
new SessionCacheFlushListener<>(context).apply(
new Record<>(
@@ -48,6 +46,6 @@ public class SessionCacheFlushListenerTest {
new Change<>("newValue", "oldValue"),
42L));
- verify(context);
+ verify(context, times(2)).setCurrentNode(null);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 9f281dc3977..530cfad80c8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -20,28 +20,26 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class TimestampedCacheFlushListenerTest {
@Test
public void shouldForwardValueTimestampIfNewValueExists() {
+ @SuppressWarnings("unchecked")
final InternalProcessorContext<String, Change<String>> context =
mock(InternalProcessorContext.class);
- expect(context.currentNode()).andReturn(null).anyTimes();
- context.setCurrentNode(null);
- context.setCurrentNode(null);
- context.forward(
+ doNothing().when(context).forward(
new Record<>(
"key",
new Change<>("newValue", "oldValue"),
42L));
- expectLastCall();
- replay(context);
new TimestampedCacheFlushListener<>(context).apply(
new Record<>(
@@ -51,22 +49,18 @@ public class TimestampedCacheFlushListenerTest {
ValueAndTimestamp.make("oldValue", 21L)),
73L));
- verify(context);
+ verify(context, times(2)).setCurrentNode(null);
}
@Test
public void shouldForwardParameterTimestampIfNewValueIsNull() {
+ @SuppressWarnings("unchecked")
final InternalProcessorContext<String, Change<String>> context =
mock(InternalProcessorContext.class);
- expect(context.currentNode()).andReturn(null).anyTimes();
- context.setCurrentNode(null);
- context.setCurrentNode(null);
- context.forward(
+ doNothing().when(context).forward(
new Record<>(
"key",
new Change<>(null, "oldValue"),
73L));
- expectLastCall();
- replay(context);
new TimestampedCacheFlushListener<>(context).apply(
new Record<>(
@@ -74,6 +68,6 @@ public class TimestampedCacheFlushListenerTest {
new Change<>(null, ValueAndTimestamp.make("oldValue", 21L)),
73L));
- verify(context);
+ verify(context, times(2)).setCurrentNode(null);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 79b96e8e523..24c35260a91 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -22,13 +22,14 @@ import
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class TimestampedTupleForwarderTest {
@Test
@@ -38,11 +39,12 @@ public class TimestampedTupleForwarderTest {
}
private void setFlushListener(final boolean sendOldValues) {
+ @SuppressWarnings("unchecked")
final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>>
store = mock(WrappedStateStore.class);
+ @SuppressWarnings("unchecked")
final TimestampedCacheFlushListener<Object, Object> flushListener =
mock(TimestampedCacheFlushListener.class);
- expect(store.setFlushListener(flushListener,
sendOldValues)).andReturn(false);
- replay(store);
+ when(store.setFlushListener(flushListener,
sendOldValues)).thenReturn(false);
new TimestampedTupleForwarder<>(
store,
@@ -50,8 +52,6 @@ public class TimestampedTupleForwarderTest {
flushListener,
sendOldValues
);
-
- verify(store);
}
@Test
@@ -61,19 +61,19 @@ public class TimestampedTupleForwarderTest {
}
private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final
boolean sendOldValues) {
+ @SuppressWarnings("unchecked")
final WrappedStateStore<StateStore, String, String> store =
mock(WrappedStateStore.class);
+ @SuppressWarnings("unchecked")
final InternalProcessorContext<String, Change<String>> context =
mock(InternalProcessorContext.class);
- expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
+ when(store.setFlushListener(null, sendOldValues)).thenReturn(false);
if (sendOldValues) {
- context.forward(new Record<>("key1", new Change<>("newValue1",
"oldValue1", true), 0L));
- context.forward(new Record<>("key2", new Change<>("newValue2",
"oldValue2", false), 42L));
+ doNothing().when(context).forward(new Record<>("key1", new
Change<>("newValue1", "oldValue1", true), 0L));
+ doNothing().when(context).forward(new Record<>("key2", new
Change<>("newValue2", "oldValue2", false), 42L));
} else {
- context.forward(new Record<>("key1", new Change<>("newValue1",
null, true), 0L));
- context.forward(new Record<>("key2", new Change<>("newValue2",
null, false), 42L));
+ doNothing().when(context).forward(new Record<>("key1", new
Change<>("newValue1", null, true), 0L));
+ doNothing().when(context).forward(new Record<>("key2", new
Change<>("newValue2", null, false), 42L));
}
- expectLastCall();
- replay(store, context);
final TimestampedTupleForwarder<String, String> forwarder =
new TimestampedTupleForwarder<>(
@@ -84,17 +84,16 @@ public class TimestampedTupleForwarderTest {
);
forwarder.maybeForward(new Record<>("key1", new Change<>("newValue1",
"oldValue1", true), 0L));
forwarder.maybeForward(new Record<>("key2", new Change<>("newValue2",
"oldValue2", false), 42L));
-
- verify(store, context);
}
@Test
public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
+ @SuppressWarnings("unchecked")
final WrappedStateStore<StateStore, String, String> store =
mock(WrappedStateStore.class);
+ @SuppressWarnings("unchecked")
final InternalProcessorContext<String, Change<String>> context =
mock(InternalProcessorContext.class);
- expect(store.setFlushListener(null, false)).andReturn(true);
- replay(store, context);
+ when(store.setFlushListener(null, false)).thenReturn(true);
final TimestampedTupleForwarder<String, String> forwarder =
new TimestampedTupleForwarder<>(
@@ -105,7 +104,5 @@ public class TimestampedTupleForwarderTest {
);
forwarder.maybeForward(new Record<>("key", new Change<>("newValue",
"oldValue", true), 0L));
forwarder.maybeForward(new Record<>("key", new Change<>("newValue",
"oldValue", true), 42L));
-
- verify(store, context);
}
}