This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e235e1a3fee KAFKA-14132: Replace PowerMock and EasyMock with Mockito 
in streams tests (#12821)
e235e1a3fee is described below

commit e235e1a3fee53a34d855f44ed44da5da2f352b06
Author: Christo Lolov <[email protected]>
AuthorDate: Thu Jan 19 19:44:08 2023 +0200

    KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests 
(#12821)
    
    Batch 1 of the tests detailed in 
https://issues.apache.org/jira/browse/KAFKA-14132 which use PowerMock/EasyMock 
and need to be moved to Mockito.
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 .../processor/internals/RepartitionTopicsTest.java | 129 ++++-----
 .../processor/internals/StateManagerUtilTest.java  | 290 +++++++--------------
 2 files changed, 146 insertions(+), 273 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index 5a020d06f38..3769a493f6d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -31,8 +31,8 @@ import 
org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,22 +47,18 @@ import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNA
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
 
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Cluster.class})
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class RepartitionTopicsTest {
 
     private static final String SOURCE_TOPIC_NAME1 = "source1";
@@ -100,35 +96,36 @@ public class RepartitionTopicsTest {
     );
     final  StreamsConfig config = new DummyStreamsConfig();
 
-    final InternalTopologyBuilder internalTopologyBuilder = 
mock(InternalTopologyBuilder.class);
-    final InternalTopicManager internalTopicManager = 
mock(InternalTopicManager.class);
-    final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer = 
mock(CopartitionedTopicsEnforcer.class);
-    final Cluster clusterMetadata = niceMock(Cluster.class);
+    @Mock
+    InternalTopologyBuilder internalTopologyBuilder;
+    @Mock
+    InternalTopicManager internalTopicManager;
+    @Mock
+    CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
+    @Mock
+    Cluster clusterMetadata;
 
     @Before
     public void setup() {
-        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
-        expect(internalTopologyBuilder.topologyName()).andStubReturn(null);
+        when(internalTopologyBuilder.hasNamedTopology()).thenReturn(false);
+        when(internalTopologyBuilder.topologyName()).thenReturn(null);
     }
 
     @Test
     public void shouldSetupRepartitionTopics() {
-        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
-            .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), 
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
+        when(internalTopologyBuilder.subtopologyToTopicsInfo())
+            .thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), 
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
         final Set<String> coPartitionGroup1 = mkSet(SOURCE_TOPIC_NAME1, 
SOURCE_TOPIC_NAME2);
         final Set<String> coPartitionGroup2 = mkSet(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_NAME2);
         final List<Set<String>> coPartitionGroups = 
Arrays.asList(coPartitionGroup1, coPartitionGroup2);
-        
expect(internalTopologyBuilder.copartitionGroups()).andReturn(coPartitionGroups);
-        copartitionedTopicsEnforcer.enforce(eq(coPartitionGroup1), 
anyObject(), eq(clusterMetadata));
-        copartitionedTopicsEnforcer.enforce(eq(coPartitionGroup2), 
anyObject(), eq(clusterMetadata));
-        expect(internalTopicManager.makeReady(
+        
when(internalTopologyBuilder.copartitionGroups()).thenReturn(coPartitionGroups);
+        when(internalTopicManager.makeReady(
             mkMap(
                 mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
                 mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2)
             ))
-        ).andReturn(Collections.emptySet());
+        ).thenReturn(Collections.emptySet());
         setupCluster();
-        replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             new TopologyMetadata(internalTopologyBuilder, config),
             internalTopicManager,
@@ -139,7 +136,6 @@ public class RepartitionTopicsTest {
 
         repartitionTopics.setup();
 
-        verify(internalTopicManager, internalTopologyBuilder);
         final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = 
repartitionTopics.topicPartitionsInfo();
         assertThat(topicPartitionsInfo.size(), is(6));
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, 
REPARTITION_TOPIC_NAME1, 0);
@@ -151,22 +147,17 @@ public class RepartitionTopicsTest {
 
         
assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), 
is(true));
         assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), 
is(true));
+
+        verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup1), 
any(), eq(clusterMetadata));
+        verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup2), 
any(), eq(clusterMetadata));
     }
 
     @Test
     public void shouldReturnMissingSourceTopics() {
         final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
-        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
-            .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), 
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
-        
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
-        copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), 
anyObject(), eq(clusterMetadata));
-        expect(internalTopicManager.makeReady(
-            mkMap(
-                mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)
-            ))
-        ).andReturn(Collections.emptySet());
+        when(internalTopologyBuilder.subtopologyToTopicsInfo())
+            .thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), 
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
         setupClusterWithMissingTopics(missingSourceTopics);
-        replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             new TopologyMetadata(internalTopologyBuilder, config),
             internalTopicManager,
@@ -190,20 +181,12 @@ public class RepartitionTopicsTest {
     public void 
shouldThrowTaskAssignmentExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
 {
         final RepartitionTopicConfig 
repartitionTopicConfigWithoutPartitionCount =
             new RepartitionTopicConfig(REPARTITION_WITHOUT_PARTITION_COUNT, 
TOPIC_CONFIG5);
-        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
-            .andReturn(mkMap(
+        when(internalTopologyBuilder.subtopologyToTopicsInfo())
+            .thenReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
                 mkEntry(SUBTOPOLOGY_1, 
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
             ));
-        
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
-        copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), 
anyObject(), eq(clusterMetadata));
-        expect(internalTopicManager.makeReady(
-            mkMap(
-                mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)
-            ))
-        ).andReturn(Collections.emptySet());
         setupCluster();
-        replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             new TopologyMetadata(internalTopologyBuilder, config),
             internalTopicManager,
@@ -230,20 +213,12 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
-            .andReturn(mkMap(
+        when(internalTopologyBuilder.subtopologyToTopicsInfo())
+            .thenReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
                 mkEntry(SUBTOPOLOGY_1, 
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
             ));
-        
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
-        copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), 
anyObject(), eq(clusterMetadata));
-        expect(internalTopicManager.makeReady(
-            mkMap(
-                mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, 
repartitionTopicConfigWithoutPartitionCount)
-            ))
-        ).andReturn(Collections.emptySet());
         setupClusterWithMissingPartitionCounts(mkSet(SOURCE_TOPIC_NAME1));
-        replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             new TopologyMetadata(internalTopologyBuilder, config),
             internalTopicManager,
@@ -275,22 +250,20 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
-            .andReturn(mkMap(
+        when(internalTopologyBuilder.subtopologyToTopicsInfo())
+            .thenReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
                 mkEntry(SUBTOPOLOGY_1, 
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
             ));
-        
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
-        copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), 
anyObject(), eq(clusterMetadata));
-        expect(internalTopicManager.makeReady(
+        
when(internalTopologyBuilder.copartitionGroups()).thenReturn(Collections.emptyList());
+        when(internalTopicManager.makeReady(
             mkMap(
                 mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
                 mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
                 mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, 
repartitionTopicConfigWithoutPartitionCount)
             ))
-        ).andReturn(Collections.emptySet());
+        ).thenReturn(Collections.emptySet());
         setupCluster();
-        replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             new TopologyMetadata(internalTopologyBuilder, config),
             internalTopicManager,
@@ -301,7 +274,6 @@ public class RepartitionTopicsTest {
 
         repartitionTopics.setup();
 
-        verify(internalTopicManager, internalTopologyBuilder);
         final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = 
repartitionTopics.topicPartitionsInfo();
         assertThat(topicPartitionsInfo.size(), is(9));
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, 
REPARTITION_TOPIC_NAME1, 0);
@@ -332,22 +304,20 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
-            .andReturn(mkMap(
+        when(internalTopologyBuilder.subtopologyToTopicsInfo())
+            .thenReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
                 mkEntry(SUBTOPOLOGY_1, 
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
             ));
-        
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
-        copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), 
anyObject(), eq(clusterMetadata));
-        expect(internalTopicManager.makeReady(
+        
when(internalTopologyBuilder.copartitionGroups()).thenReturn(Collections.emptyList());
+        when(internalTopicManager.makeReady(
             mkMap(
                 mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
                 mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
                 mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT, 
repartitionTopicConfigWithoutPartitionCount)
             ))
-        ).andReturn(Collections.emptySet());
+        ).thenReturn(Collections.emptySet());
         setupCluster();
-        replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             new TopologyMetadata(internalTopologyBuilder, config),
             internalTopicManager,
@@ -358,7 +328,6 @@ public class RepartitionTopicsTest {
 
         repartitionTopics.setup();
 
-        verify(internalTopicManager, internalTopologyBuilder);
         final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = 
repartitionTopics.topicPartitionsInfo();
         assertThat(topicPartitionsInfo.size(), is(10));
         verifyRepartitionTopicPartitionInfo(topicPartitionsInfo, 
REPARTITION_TOPIC_NAME1, 0);
@@ -384,10 +353,9 @@ public class RepartitionTopicsTest {
             Collections.emptyMap(),
             Collections.emptyMap()
         );
-        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
-            .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
+        when(internalTopologyBuilder.subtopologyToTopicsInfo())
+            .thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
         setupCluster();
-        replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             new TopologyMetadata(internalTopologyBuilder, config),
             internalTopicManager,
@@ -398,7 +366,6 @@ public class RepartitionTopicsTest {
 
         repartitionTopics.setup();
 
-        verify(internalTopicManager, internalTopologyBuilder);
         final Map<TopicPartition, PartitionInfo> topicPartitionsInfo = 
repartitionTopics.topicPartitionsInfo();
         assertThat(topicPartitionsInfo, is(Collections.emptyMap()));
 
@@ -447,13 +414,9 @@ public class RepartitionTopicsTest {
             SOME_OTHER_TOPIC
         );
         topics.removeAll(missingTopics);
-        expect(clusterMetadata.topics()).andStubReturn(topics);
-        expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME1))
-            
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME1) ? 
null : 3);
-        expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME2))
-            
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME2) ? 
null : 1);
-        expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME3))
-            
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME3) ? 
null : 2);
+        when(clusterMetadata.topics()).thenReturn(topics);
+        when(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME1))
+            
.thenReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME1) ? 
null : 3);
     }
 
     private TopicsInfo 
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(final 
RepartitionTopicConfig repartitionTopicConfigWithoutPartitionCount) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index bc7fb14ba7d..0e12cdb49d9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -26,14 +26,12 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.IMocksControl;
-import org.easymock.Mock;
-import org.easymock.MockType;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -42,77 +40,54 @@ import java.util.Arrays;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(emptyList());
 
         StateManagerUtil.registerStateStores(logger,
             "logPrefix:", topology, stateManager, stateDirectory, 
processorContext);
-
-        ctrl.verify();
     }
 
     @Test
     public void testRegisterStateStoreFailToLockStateDirectory() {
-        expect(topology.stateStores()).andReturn(singletonList(new 
MockKeyValueStore("store", false)));
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(singletonList(new 
MockKeyValueStore("store", false)));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         final LockException thrown = assertThrows(LockException.class,
             () -> StateManagerUtil.registerStateStores(logger, "logPrefix:",
                 topology, stateManager, stateDirectory, processorContext));
 
         assertEquals("logPrefix:Failed to lock the state directory for task 
0_0", thrown.getMessage());
-
-        ctrl.verify();
     }
 
     @Test
@@ -120,65 +95,40 @@ public class StateManagerUtilTest {
         final MockKeyValueStore store1 = new MockKeyValueStore("store1", 
false);
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
-
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
-
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        stateManager.registerStateStores(stateStores, processorContext);
-
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        final InOrder inOrder = inOrder(stateManager);
+        when(topology.stateStores()).thenReturn(stateStores);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, 
processorContext);
+        
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        // The unlock logic should still be executed.
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         final ProcessorStateException thrown = assertThrows(
             ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
@@ -187,151 +137,111 @@ public class StateManagerUtilTest {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
         // The `baseDir` will be accessed when attempting to delete the state 
store.
-        
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        Utils.delete(randomFile);
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
+    public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new 
IOException("Deletion failed"));
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        replayAll();
-
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to 
close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
+                logger, "logPrefix:", true, false, stateManager, 
stateDirectory, TaskType.ACTIVE);
+
+        inOrder.verify(stateManager).taskId();
+        inOrder.verify(stateDirectory).lock(taskId);
+        verify(stateManager, never()).close();
+        verify(stateManager, never()).baseDir();
+        verify(stateDirectory, never()).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws 
IOException {
-        final File unknownFile = new File("/unknown/path");
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
-
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new AssertionError("Should not be trying to 
wipe state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
+                logger, "logPrefix:", false, true, stateManager, 
stateDirectory, TaskType.ACTIVE);
+
+        inOrder.verify(stateManager).taskId();
+        inOrder.verify(stateDirectory).lock(taskId);
+        verify(stateManager, never()).close();
+        verify(stateManager, never()).baseDir();
+        verify(stateDirectory, never()).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 }

Reply via email to