This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d0e9e94629c KAFKA-14133: Migrate ActiveTaskCreatorTest,
ChangelogTopicsTest and GlobalProcessorContextImplTest to Mockito (#14209)
d0e9e94629c is described below
commit d0e9e94629c1347b19a2992b67d8590e61a59d04
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Aug 16 09:19:35 2023 +0100
KAFKA-14133: Migrate ActiveTaskCreatorTest, ChangelogTopicsTest and
GlobalProcessorContextImplTest to Mockito (#14209)
Reviewers: Divij Vaidya <[email protected]>
---
.../processor/internals/ActiveTaskCreatorTest.java | 44 +++++++----------
.../processor/internals/ChangelogTopicsTest.java | 35 +++++---------
.../internals/GlobalProcessorContextImplTest.java | 56 ++++++++++------------
3 files changed, 57 insertions(+), 78 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 13eb95d9f1f..ffdd0699c7a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -33,11 +33,10 @@ import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockClientSupplier;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
-import org.easymock.MockType;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.io.File;
import java.util.Collections;
@@ -50,10 +49,6 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -61,15 +56,17 @@ import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThrows;
import static java.util.Collections.emptySet;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ActiveTaskCreatorTest {
- @Mock(type = MockType.NICE)
+ @Mock
private InternalTopologyBuilder builder;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ChangelogReader changeLogReader;
private final MockClientSupplier mockClientSupplier = new
MockClientSupplier();
@@ -476,21 +473,16 @@ public class ActiveTaskCreatorTest {
final ProcessorTopology topology = mock(ProcessorTopology.class);
final SourceNode sourceNode = mock(SourceNode.class);
- reset(builder, stateDirectory);
- expect(builder.topologyConfigs()).andStubReturn(new TopologyConfig(new
StreamsConfig(properties)));
- expect(builder.buildSubtopology(0)).andReturn(topology).anyTimes();
- expect(topology.sinkTopics()).andStubReturn(emptySet());
-
expect(stateDirectory.getOrCreateDirectoryForTask(task00)).andReturn(mock(File.class));
-
expect(stateDirectory.checkpointFileFor(task00)).andReturn(mock(File.class));
-
expect(stateDirectory.getOrCreateDirectoryForTask(task01)).andReturn(mock(File.class));
-
expect(stateDirectory.checkpointFileFor(task01)).andReturn(mock(File.class));
-
expect(topology.storeToChangelogTopic()).andReturn(Collections.emptyMap()).anyTimes();
- expect(topology.source("topic")).andReturn(sourceNode).anyTimes();
-
expect(sourceNode.getTimestampExtractor()).andReturn(mock(TimestampExtractor.class)).anyTimes();
-
expect(topology.globalStateStores()).andReturn(Collections.emptyList()).anyTimes();
-
expect(topology.terminalNodes()).andStubReturn(Collections.singleton(sourceNode.name()));
-
expect(topology.sources()).andStubReturn(Collections.singleton(sourceNode));
- replay(builder, stateDirectory, topology, sourceNode);
+ when(builder.topologyConfigs()).thenReturn(new TopologyConfig(new
StreamsConfig(properties)));
+ when(builder.buildSubtopology(0)).thenReturn(topology);
+ when(topology.sinkTopics()).thenReturn(emptySet());
+
when(stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(mock(File.class));
+
when(stateDirectory.checkpointFileFor(task00)).thenReturn(mock(File.class));
+
when(stateDirectory.getOrCreateDirectoryForTask(task01)).thenReturn(mock(File.class));
+
when(stateDirectory.checkpointFileFor(task01)).thenReturn(mock(File.class));
+ when(topology.source("topic")).thenReturn(sourceNode);
+
when(sourceNode.getTimestampExtractor()).thenReturn(mock(TimestampExtractor.class));
+ when(topology.sources()).thenReturn(Collections.singleton(sourceNode));
final StreamsConfig config = new StreamsConfig(properties);
activeTaskCreator = new ActiveTaskCreator(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java
index 6da192fdee7..b4e406504dc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java
@@ -22,6 +22,8 @@ import
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.Topi
import
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import java.util.Map;
@@ -32,13 +34,12 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ChangelogTopicsTest {
private static final String SOURCE_TOPIC_NAME = "source";
@@ -83,16 +84,14 @@ public class ChangelogTopicsTest {
@Test
public void shouldNotContainChangelogsForStatelessTasks() {
-
expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
+
when(internalTopicManager.makeReady(Collections.emptyMap())).thenReturn(Collections.emptySet());
final Map<Subtopology, TopicsInfo> topicGroups =
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO2));
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup =
mkMap(mkEntry(SUBTOPOLOGY_0, mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
- replay(internalTopicManager);
final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups,
tasksForTopicGroup, "[test] ");
changelogTopics.setup();
- verify(internalTopicManager);
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0),
is(Collections.emptySet()));
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1),
is(Collections.emptySet()));
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_2),
is(Collections.emptySet()));
@@ -102,18 +101,16 @@ public class ChangelogTopicsTest {
@Test
public void
shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated() {
-
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1,
CHANGELOG_TOPIC_CONFIG))))
- .andStubReturn(mkSet(CHANGELOG_TOPIC_NAME1));
+
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1,
CHANGELOG_TOPIC_CONFIG))))
+ .thenReturn(mkSet(CHANGELOG_TOPIC_NAME1));
final Map<Subtopology, TopicsInfo> topicGroups =
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup =
mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
- replay(internalTopicManager);
final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups,
tasksForTopicGroup, "[test] ");
changelogTopics.setup();
- verify(internalTopicManager);
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE),
is(3));
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0),
is(Collections.emptySet()));
assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1),
is(Collections.emptySet()));
@@ -124,18 +121,16 @@ public class ChangelogTopicsTest {
@Test
public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() {
-
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1,
CHANGELOG_TOPIC_CONFIG))))
- .andStubReturn(Collections.emptySet());
+
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1,
CHANGELOG_TOPIC_CONFIG))))
+ .thenReturn(Collections.emptySet());
final Map<Subtopology, TopicsInfo> topicGroups =
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup =
mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
- replay(internalTopicManager);
final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups,
tasksForTopicGroup, "[test] ");
changelogTopics.setup();
- verify(internalTopicManager);
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE),
is(3));
final TopicPartition changelogPartition0 = new
TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
final TopicPartition changelogPartition1 = new
TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
@@ -152,17 +147,15 @@ public class ChangelogTopicsTest {
@Test
public void shouldOnlyContainPreExistingSourceBasedChangelogs() {
-
expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
+
when(internalTopicManager.makeReady(Collections.emptyMap())).thenReturn(Collections.emptySet());
final Map<Subtopology, TopicsInfo> topicGroups =
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO3));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup =
mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
- replay(internalTopicManager);
final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups,
tasksForTopicGroup, "[test] ");
changelogTopics.setup();
- verify(internalTopicManager);
final TopicPartition changelogPartition0 = new
TopicPartition(SOURCE_TOPIC_NAME, 0);
final TopicPartition changelogPartition1 = new
TopicPartition(SOURCE_TOPIC_NAME, 1);
final TopicPartition changelogPartition2 = new
TopicPartition(SOURCE_TOPIC_NAME, 2);
@@ -178,18 +171,16 @@ public class ChangelogTopicsTest {
@Test
public void shouldContainBothTypesOfPreExistingChangelogs() {
-
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1,
CHANGELOG_TOPIC_CONFIG))))
- .andStubReturn(Collections.emptySet());
+
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1,
CHANGELOG_TOPIC_CONFIG))))
+ .thenReturn(Collections.emptySet());
final Map<Subtopology, TopicsInfo> topicGroups =
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO4));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup =
mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
- replay(internalTopicManager);
final ChangelogTopics changelogTopics =
new ChangelogTopics(internalTopicManager, topicGroups,
tasksForTopicGroup, "[test] ");
changelogTopics.setup();
- verify(internalTopicManager);
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE),
is(3));
final TopicPartition changelogPartition0 = new
TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
final TopicPartition changelogPartition1 = new
TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 1dffd7ac665..07536c414a3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -17,13 +17,11 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -32,18 +30,20 @@ import org.apache.kafka.streams.state.WindowStore;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class GlobalProcessorContextImplTest {
private static final String GLOBAL_STORE_NAME = "global-store";
private static final String GLOBAL_KEY_VALUE_STORE_NAME =
"global-key-value-store";
@@ -55,27 +55,16 @@ public class GlobalProcessorContextImplTest {
private GlobalProcessorContextImpl globalContext;
+ @Mock
private ProcessorNode<Object, Object, Object, Object> child;
private ProcessorRecordContext recordContext;
+ @Mock
+ private GlobalStateManager stateManager;
@Before
public void setup() {
final StreamsConfig streamsConfig = mock(StreamsConfig.class);
-
expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("dummy-id");
-
expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
- expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
- replay(streamsConfig);
-
- final GlobalStateManager stateManager = mock(GlobalStateManager.class);
-
expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(StateStore.class));
-
expect(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).andReturn(mock(KeyValueStore.class));
-
expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).andReturn(mock(TimestampedKeyValueStore.class));
-
expect(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).andReturn(mock(WindowStore.class));
-
expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).andReturn(mock(TimestampedWindowStore.class));
-
expect(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).andReturn(mock(SessionStore.class));
- expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null);
- expect(stateManager.taskType()).andStubReturn(TaskType.GLOBAL);
- replay(stateManager);
+
when(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("dummy-id");
globalContext = new GlobalProcessorContextImpl(
streamsConfig,
@@ -86,7 +75,6 @@ public class GlobalProcessorContextImplTest {
final ProcessorNode<Object, Object, Object, Object> processorNode =
new ProcessorNode<>("testNode");
- child = mock(ProcessorNode.class);
processorNode.addChild(child);
globalContext.setCurrentNode(processorNode);
@@ -96,20 +84,18 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldReturnGlobalOrNullStore() {
+
when(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).thenReturn(mock(StateStore.class));
assertThat(globalContext.getStateStore(GLOBAL_STORE_NAME), new
IsInstanceOf(StateStore.class));
assertNull(globalContext.getStateStore(UNKNOWN_STORE));
}
@Test
public void shouldForwardToSingleChild() {
- child.process(anyObject());
- expectLastCall();
+ doNothing().when(child).process(any());
- expect(recordContext.timestamp()).andStubReturn(0L);
- expect(recordContext.headers()).andStubReturn(new RecordHeaders());
- replay(child, recordContext);
+ when(recordContext.timestamp()).thenReturn(0L);
+ when(recordContext.headers()).thenReturn(new RecordHeaders());
globalContext.forward((Object /*forcing a call to the K/V forward*/)
null, null);
- verify(child, recordContext);
}
@Test
@@ -129,6 +115,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowInitForKeyValueStore() {
+
when(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).thenReturn(mock(KeyValueStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
try {
store.init((StateStoreContext) null, null);
@@ -138,6 +125,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowInitForTimestampedKeyValueStore() {
+
when(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).thenReturn(mock(TimestampedKeyValueStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
try {
store.init((StateStoreContext) null, null);
@@ -147,6 +135,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowInitForWindowStore() {
+
when(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).thenReturn(mock(WindowStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
try {
store.init((StateStoreContext) null, null);
@@ -156,6 +145,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowInitForTimestampedWindowStore() {
+
when(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).thenReturn(mock(TimestampedWindowStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
try {
store.init((StateStoreContext) null, null);
@@ -165,6 +155,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowInitForSessionStore() {
+
when(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).thenReturn(mock(SessionStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
try {
store.init((StateStoreContext) null, null);
@@ -174,6 +165,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowCloseForKeyValueStore() {
+
when(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).thenReturn(mock(KeyValueStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
try {
store.close();
@@ -183,6 +175,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowCloseForTimestampedKeyValueStore() {
+
when(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).thenReturn(mock(TimestampedKeyValueStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
try {
store.close();
@@ -192,6 +185,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowCloseForWindowStore() {
+
when(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).thenReturn(mock(WindowStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
try {
store.close();
@@ -201,6 +195,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowCloseForTimestampedWindowStore() {
+
when(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).thenReturn(mock(TimestampedWindowStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
try {
store.close();
@@ -210,6 +205,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldNotAllowCloseForSessionStore() {
+
when(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).thenReturn(mock(SessionStore.class));
final StateStore store =
globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
try {
store.close();