This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1a15cd708ab  KAFKA-14133: Migrato SessionCacheFlushListenerTest, 
TimestampedCacheFlushListenerTest and TimestampedTupleForwarderTest to Mockito 
(#14205)
1a15cd708ab is described below

commit 1a15cd708ab252e7c75dea06f36db36599f18322
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Aug 16 08:46:40 2023 +0100

     KAFKA-14133: Migrato SessionCacheFlushListenerTest, 
TimestampedCacheFlushListenerTest and TimestampedTupleForwarderTest to Mockito 
(#14205)
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../internals/SessionCacheFlushListenerTest.java   | 22 ++++++------
 .../TimestampedCacheFlushListenerTest.java         | 32 +++++++----------
 .../internals/TimestampedTupleForwarderTest.java   | 41 ++++++++++------------
 3 files changed, 42 insertions(+), 53 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
index 675f248083e..87cf2576b5e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java
@@ -20,27 +20,25 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class SessionCacheFlushListenerTest {
     @Test
     public void shouldForwardKeyNewValueOldValueAndTimestamp() {
+        @SuppressWarnings("unchecked")
         final InternalProcessorContext<Windowed<String>, Change<String>> 
context = mock(InternalProcessorContext.class);
-        expect(context.currentNode()).andReturn(null).anyTimes();
-        context.setCurrentNode(null);
-        context.setCurrentNode(null);
-        context.forward(
+        doNothing().when(context).forward(
             new Record<>(
                 new Windowed<>("key", new SessionWindow(21L, 73L)),
                 new Change<>("newValue", "oldValue"),
                 73L));
-        expectLastCall();
-        replay(context);
 
         new SessionCacheFlushListener<>(context).apply(
             new Record<>(
@@ -48,6 +46,6 @@ public class SessionCacheFlushListenerTest {
                 new Change<>("newValue", "oldValue"),
                 42L));
 
-        verify(context);
+        verify(context, times(2)).setCurrentNode(null);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
index 9f281dc3977..530cfad80c8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerTest.java
@@ -20,28 +20,26 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class TimestampedCacheFlushListenerTest {
 
     @Test
     public void shouldForwardValueTimestampIfNewValueExists() {
+        @SuppressWarnings("unchecked")
         final InternalProcessorContext<String, Change<String>> context = 
mock(InternalProcessorContext.class);
-        expect(context.currentNode()).andReturn(null).anyTimes();
-        context.setCurrentNode(null);
-        context.setCurrentNode(null);
-        context.forward(
+        doNothing().when(context).forward(
             new Record<>(
                 "key",
                 new Change<>("newValue", "oldValue"),
                 42L));
-        expectLastCall();
-        replay(context);
 
         new TimestampedCacheFlushListener<>(context).apply(
             new Record<>(
@@ -51,22 +49,18 @@ public class TimestampedCacheFlushListenerTest {
                     ValueAndTimestamp.make("oldValue", 21L)),
                 73L));
 
-        verify(context);
+        verify(context, times(2)).setCurrentNode(null);
     }
 
     @Test
     public void shouldForwardParameterTimestampIfNewValueIsNull() {
+        @SuppressWarnings("unchecked")
         final InternalProcessorContext<String, Change<String>> context = 
mock(InternalProcessorContext.class);
-        expect(context.currentNode()).andReturn(null).anyTimes();
-        context.setCurrentNode(null);
-        context.setCurrentNode(null);
-        context.forward(
+        doNothing().when(context).forward(
             new Record<>(
                 "key",
                 new Change<>(null, "oldValue"),
                 73L));
-        expectLastCall();
-        replay(context);
 
         new TimestampedCacheFlushListener<>(context).apply(
             new Record<>(
@@ -74,6 +68,6 @@ public class TimestampedCacheFlushListenerTest {
                 new Change<>(null, ValueAndTimestamp.make("oldValue", 21L)),
                 73L));
 
-        verify(context);
+        verify(context, times(2)).setCurrentNode(null);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 79b96e8e523..24c35260a91 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -22,13 +22,14 @@ import 
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class TimestampedTupleForwarderTest {
 
     @Test
@@ -38,11 +39,12 @@ public class TimestampedTupleForwarderTest {
     }
 
     private void setFlushListener(final boolean sendOldValues) {
+        @SuppressWarnings("unchecked")
         final WrappedStateStore<StateStore, Object, ValueAndTimestamp<Object>> 
store = mock(WrappedStateStore.class);
+        @SuppressWarnings("unchecked")
         final TimestampedCacheFlushListener<Object, Object> flushListener = 
mock(TimestampedCacheFlushListener.class);
 
-        expect(store.setFlushListener(flushListener, 
sendOldValues)).andReturn(false);
-        replay(store);
+        when(store.setFlushListener(flushListener, 
sendOldValues)).thenReturn(false);
 
         new TimestampedTupleForwarder<>(
             store,
@@ -50,8 +52,6 @@ public class TimestampedTupleForwarderTest {
             flushListener,
             sendOldValues
         );
-
-        verify(store);
     }
 
     @Test
@@ -61,19 +61,19 @@ public class TimestampedTupleForwarderTest {
     }
 
     private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final 
boolean sendOldValues) {
+        @SuppressWarnings("unchecked")
         final WrappedStateStore<StateStore, String, String> store = 
mock(WrappedStateStore.class);
+        @SuppressWarnings("unchecked")
         final InternalProcessorContext<String, Change<String>> context = 
mock(InternalProcessorContext.class);
 
-        expect(store.setFlushListener(null, sendOldValues)).andReturn(false);
+        when(store.setFlushListener(null, sendOldValues)).thenReturn(false);
         if (sendOldValues) {
-            context.forward(new Record<>("key1", new Change<>("newValue1",  
"oldValue1", true), 0L));
-            context.forward(new Record<>("key2", new Change<>("newValue2",  
"oldValue2", false), 42L));
+            doNothing().when(context).forward(new Record<>("key1", new 
Change<>("newValue1",  "oldValue1", true), 0L));
+            doNothing().when(context).forward(new Record<>("key2", new 
Change<>("newValue2",  "oldValue2", false), 42L));
         } else {
-            context.forward(new Record<>("key1", new Change<>("newValue1", 
null, true), 0L));
-            context.forward(new Record<>("key2", new Change<>("newValue2", 
null, false), 42L));
+            doNothing().when(context).forward(new Record<>("key1", new 
Change<>("newValue1", null, true), 0L));
+            doNothing().when(context).forward(new Record<>("key2", new 
Change<>("newValue2", null, false), 42L));
         }
-        expectLastCall();
-        replay(store, context);
 
         final TimestampedTupleForwarder<String, String> forwarder =
             new TimestampedTupleForwarder<>(
@@ -84,17 +84,16 @@ public class TimestampedTupleForwarderTest {
             );
         forwarder.maybeForward(new Record<>("key1", new Change<>("newValue1", 
"oldValue1", true), 0L));
         forwarder.maybeForward(new Record<>("key2", new Change<>("newValue2", 
"oldValue2", false), 42L));
-
-        verify(store, context);
     }
 
     @Test
     public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
+        @SuppressWarnings("unchecked")
         final WrappedStateStore<StateStore, String, String> store = 
mock(WrappedStateStore.class);
+        @SuppressWarnings("unchecked")
         final InternalProcessorContext<String, Change<String>> context = 
mock(InternalProcessorContext.class);
 
-        expect(store.setFlushListener(null, false)).andReturn(true);
-        replay(store, context);
+        when(store.setFlushListener(null, false)).thenReturn(true);
 
         final TimestampedTupleForwarder<String, String> forwarder =
             new TimestampedTupleForwarder<>(
@@ -105,7 +104,5 @@ public class TimestampedTupleForwarderTest {
             );
         forwarder.maybeForward(new Record<>("key", new Change<>("newValue", 
"oldValue", true), 0L));
         forwarder.maybeForward(new Record<>("key", new Change<>("newValue", 
"oldValue", true), 42L));
-
-        verify(store, context);
     }
 }

Reply via email to