This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e235e1a3fee KAFKA-14132: Replace PowerMock and EasyMock with Mockito
in streams tests (#12821)
e235e1a3fee is described below
commit e235e1a3fee53a34d855f44ed44da5da2f352b06
Author: Christo Lolov <[email protected]>
AuthorDate: Thu Jan 19 19:44:08 2023 +0200
KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
(#12821)
Batch 1 of the tests detailed in
https://issues.apache.org/jira/browse/KAFKA-14132 which use PowerMock/EasyMock
and need to be moved to Mockito.
Reviewer: Bruno Cadonna <[email protected]>
---
.../processor/internals/RepartitionTopicsTest.java | 129 ++++-----
.../processor/internals/StateManagerUtilTest.java | 290 +++++++--------------
2 files changed, 146 insertions(+), 273 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index 5a020d06f38..3769a493f6d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -31,8 +31,8 @@ import
org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.Arrays;
import java.util.Collections;
@@ -47,22 +47,18 @@ import static
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNA
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Cluster.class})
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class RepartitionTopicsTest {
private static final String SOURCE_TOPIC_NAME1 = "source1";
@@ -100,35 +96,36 @@ public class RepartitionTopicsTest {
);
final StreamsConfig config = new DummyStreamsConfig();
- final InternalTopologyBuilder internalTopologyBuilder =
mock(InternalTopologyBuilder.class);
- final InternalTopicManager internalTopicManager =
mock(InternalTopicManager.class);
- final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer =
mock(CopartitionedTopicsEnforcer.class);
- final Cluster clusterMetadata = niceMock(Cluster.class);
+ @Mock
+ InternalTopologyBuilder internalTopologyBuilder;
+ @Mock
+ InternalTopicManager internalTopicManager;
+ @Mock
+ CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
+ @Mock
+ Cluster clusterMetadata;
@Before
public void setup() {
-
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
- expect(internalTopologyBuilder.topologyName()).andStubReturn(null);
+ when(internalTopologyBuilder.hasNamedTopology()).thenReturn(false);
+ when(internalTopologyBuilder.topologyName()).thenReturn(null);
}
@Test
public void shouldSetupRepartitionTopics() {
- expect(internalTopologyBuilder.subtopologyToTopicsInfo())
- .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
+ when(internalTopologyBuilder.subtopologyToTopicsInfo())
+ .thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
final Set<String> coPartitionGroup1 = mkSet(SOURCE_TOPIC_NAME1,
SOURCE_TOPIC_NAME2);
final Set<String> coPartitionGroup2 = mkSet(REPARTITION_TOPIC_NAME1,
REPARTITION_TOPIC_NAME2);
final List<Set<String>> coPartitionGroups =
Arrays.asList(coPartitionGroup1, coPartitionGroup2);
-
expect(internalTopologyBuilder.copartitionGroups()).andReturn(coPartitionGroups);
- copartitionedTopicsEnforcer.enforce(eq(coPartitionGroup1),
anyObject(), eq(clusterMetadata));
- copartitionedTopicsEnforcer.enforce(eq(coPartitionGroup2),
anyObject(), eq(clusterMetadata));
- expect(internalTopicManager.makeReady(
+
when(internalTopologyBuilder.copartitionGroups()).thenReturn(coPartitionGroups);
+ when(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2)
))
- ).andReturn(Collections.emptySet());
+ ).thenReturn(Collections.emptySet());
setupCluster();
- replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
@@ -139,7 +136,6 @@ public class RepartitionTopicsTest {
repartitionTopics.setup();
- verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo =
repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo.size(), is(6));
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo,
REPARTITION_TOPIC_NAME1, 0);
@@ -151,22 +147,17 @@ public class RepartitionTopicsTest {
assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(),
is(true));
assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(),
is(true));
+
+ verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup1),
any(), eq(clusterMetadata));
+ verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup2),
any(), eq(clusterMetadata));
}
@Test
public void shouldReturnMissingSourceTopics() {
final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
- expect(internalTopologyBuilder.subtopologyToTopicsInfo())
- .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
-
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
- copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()),
anyObject(), eq(clusterMetadata));
- expect(internalTopicManager.makeReady(
- mkMap(
- mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)
- ))
- ).andReturn(Collections.emptySet());
+ when(internalTopologyBuilder.subtopologyToTopicsInfo())
+ .thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
setupClusterWithMissingTopics(missingSourceTopics);
- replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
@@ -190,20 +181,12 @@ public class RepartitionTopicsTest {
public void
shouldThrowTaskAssignmentExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
{
final RepartitionTopicConfig
repartitionTopicConfigWithoutPartitionCount =
new RepartitionTopicConfig(REPARTITION_WITHOUT_PARTITION_COUNT,
TOPIC_CONFIG5);
- expect(internalTopologyBuilder.subtopologyToTopicsInfo())
- .andReturn(mkMap(
+ when(internalTopologyBuilder.subtopologyToTopicsInfo())
+ .thenReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1,
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
));
-
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
- copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()),
anyObject(), eq(clusterMetadata));
- expect(internalTopicManager.makeReady(
- mkMap(
- mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)
- ))
- ).andReturn(Collections.emptySet());
setupCluster();
- replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
@@ -230,20 +213,12 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
- expect(internalTopologyBuilder.subtopologyToTopicsInfo())
- .andReturn(mkMap(
+ when(internalTopologyBuilder.subtopologyToTopicsInfo())
+ .thenReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1,
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
));
-
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
- copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()),
anyObject(), eq(clusterMetadata));
- expect(internalTopicManager.makeReady(
- mkMap(
- mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT,
repartitionTopicConfigWithoutPartitionCount)
- ))
- ).andReturn(Collections.emptySet());
setupClusterWithMissingPartitionCounts(mkSet(SOURCE_TOPIC_NAME1));
- replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
@@ -275,22 +250,20 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
- expect(internalTopologyBuilder.subtopologyToTopicsInfo())
- .andReturn(mkMap(
+ when(internalTopologyBuilder.subtopologyToTopicsInfo())
+ .thenReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1,
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
));
-
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
- copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()),
anyObject(), eq(clusterMetadata));
- expect(internalTopicManager.makeReady(
+
when(internalTopologyBuilder.copartitionGroups()).thenReturn(Collections.emptyList());
+ when(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT,
repartitionTopicConfigWithoutPartitionCount)
))
- ).andReturn(Collections.emptySet());
+ ).thenReturn(Collections.emptySet());
setupCluster();
- replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
@@ -301,7 +274,6 @@ public class RepartitionTopicsTest {
repartitionTopics.setup();
- verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo =
repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo.size(), is(9));
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo,
REPARTITION_TOPIC_NAME1, 0);
@@ -332,22 +304,20 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
- expect(internalTopologyBuilder.subtopologyToTopicsInfo())
- .andReturn(mkMap(
+ when(internalTopologyBuilder.subtopologyToTopicsInfo())
+ .thenReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1,
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
));
-
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
- copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()),
anyObject(), eq(clusterMetadata));
- expect(internalTopicManager.makeReady(
+
when(internalTopologyBuilder.copartitionGroups()).thenReturn(Collections.emptyList());
+ when(internalTopicManager.makeReady(
mkMap(
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1),
mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_CONFIG2),
mkEntry(REPARTITION_WITHOUT_PARTITION_COUNT,
repartitionTopicConfigWithoutPartitionCount)
))
- ).andReturn(Collections.emptySet());
+ ).thenReturn(Collections.emptySet());
setupCluster();
- replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
@@ -358,7 +328,6 @@ public class RepartitionTopicsTest {
repartitionTopics.setup();
- verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo =
repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo.size(), is(10));
verifyRepartitionTopicPartitionInfo(topicPartitionsInfo,
REPARTITION_TOPIC_NAME1, 0);
@@ -384,10 +353,9 @@ public class RepartitionTopicsTest {
Collections.emptyMap(),
Collections.emptyMap()
);
- expect(internalTopologyBuilder.subtopologyToTopicsInfo())
- .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
+ when(internalTopologyBuilder.subtopologyToTopicsInfo())
+ .thenReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
setupCluster();
- replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
internalTopicManager,
@@ -398,7 +366,6 @@ public class RepartitionTopicsTest {
repartitionTopics.setup();
- verify(internalTopicManager, internalTopologyBuilder);
final Map<TopicPartition, PartitionInfo> topicPartitionsInfo =
repartitionTopics.topicPartitionsInfo();
assertThat(topicPartitionsInfo, is(Collections.emptyMap()));
@@ -447,13 +414,9 @@ public class RepartitionTopicsTest {
SOME_OTHER_TOPIC
);
topics.removeAll(missingTopics);
- expect(clusterMetadata.topics()).andStubReturn(topics);
- expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME1))
-
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME1) ?
null : 3);
- expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME2))
-
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME2) ?
null : 1);
- expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME3))
-
.andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME3) ?
null : 2);
+ when(clusterMetadata.topics()).thenReturn(topics);
+ when(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME1))
+
.thenReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME1) ?
null : 3);
}
private TopicsInfo
setupTopicInfoWithRepartitionTopicWithoutPartitionCount(final
RepartitionTopicConfig repartitionTopicConfigWithoutPartitionCount) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index bc7fb14ba7d..0e12cdb49d9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -26,14 +26,12 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.TestUtils;
-import org.easymock.IMocksControl;
-import org.easymock.Mock;
-import org.easymock.MockType;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import java.io.File;
@@ -42,77 +40,54 @@ import java.util.Arrays;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StateManagerUtilTest {
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorTopology topology;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext processorContext;
- private IMocksControl ctrl;
-
private Logger logger = new LogContext("test").logger(AbstractTask.class);
private final TaskId taskId = new TaskId(0, 0);
- @Before
- public void setup() {
- ctrl = createStrictControl();
- topology = ctrl.createMock(ProcessorTopology.class);
- processorContext = ctrl.createMock(InternalProcessorContext.class);
-
- stateManager = ctrl.createMock(ProcessorStateManager.class);
- stateDirectory = ctrl.createMock(StateDirectory.class);
- }
-
@Test
public void testRegisterStateStoreWhenTopologyEmpty() {
- expect(topology.stateStores()).andReturn(emptyList());
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(emptyList());
StateManagerUtil.registerStateStores(logger,
"logPrefix:", topology, stateManager, stateDirectory,
processorContext);
-
- ctrl.verify();
}
@Test
public void testRegisterStateStoreFailToLockStateDirectory() {
- expect(topology.stateStores()).andReturn(singletonList(new
MockKeyValueStore("store", false)));
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(singletonList(new
MockKeyValueStore("store", false)));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
final LockException thrown = assertThrows(LockException.class,
() -> StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext));
assertEquals("logPrefix:Failed to lock the state directory for task
0_0", thrown.getMessage());
-
- ctrl.verify();
}
@Test
@@ -120,65 +95,40 @@ public class StateManagerUtilTest {
final MockKeyValueStore store1 = new MockKeyValueStore("store1",
false);
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
-
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
-
- expect(topology.stateStores()).andReturn(stateStores);
-
- stateManager.registerStateStores(stateStores, processorContext);
-
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ final InOrder inOrder = inOrder(stateManager);
+ when(topology.stateStores()).thenReturn(stateStores);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores,
processorContext);
+
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
StateManagerUtil.closeStateManager(logger,
"logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenClean() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- // The unlock logic should still be executed.
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
final ProcessorStateException thrown = assertThrows(
ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
@@ -187,151 +137,111 @@ public class StateManagerUtilTest {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws
IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
- expect(stateManager.baseDir()).andReturn(randomFile);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("Close
failed")).when(stateManager).close();
+ when(stateManager.baseDir()).thenReturn(randomFile);
- Utils.delete(randomFile);
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:",
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false,
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws
IOException {
+ public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new
IOException("Deletion failed"));
- stateDirectory.unlock(taskId);
- expectLastCall();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.checkOrder(true);
- ctrl.replay();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- replayAll();
-
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to
close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
+ logger, "logPrefix:", true, false, stateManager,
stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager).taskId();
+ inOrder.verify(stateDirectory).lock(taskId);
+ verify(stateManager, never()).close();
+ verify(stateManager, never()).baseDir();
+ verify(stateDirectory, never()).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws
IOException {
- final File unknownFile = new File("/unknown/path");
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
-
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new AssertionError("Should not be trying to
wipe state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
+ logger, "logPrefix:", false, true, stateManager,
stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager).taskId();
+ inOrder.verify(stateDirectory).lock(taskId);
+ verify(stateManager, never()).close();
+ verify(stateManager, never()).baseDir();
+ verify(stateDirectory, never()).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
}