This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d0e9e94629c KAFKA-14133: Migrate ActiveTaskCreatorTest, 
ChangelogTopicsTest and GlobalProcessorContextImplTest to Mockito (#14209)
d0e9e94629c is described below

commit d0e9e94629c1347b19a2992b67d8590e61a59d04
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Aug 16 09:19:35 2023 +0100

    KAFKA-14133: Migrate ActiveTaskCreatorTest, ChangelogTopicsTest and 
GlobalProcessorContextImplTest to Mockito (#14209)
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../processor/internals/ActiveTaskCreatorTest.java | 44 +++++++----------
 .../processor/internals/ChangelogTopicsTest.java   | 35 +++++---------
 .../internals/GlobalProcessorContextImplTest.java  | 56 ++++++++++------------
 3 files changed, 57 insertions(+), 78 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 13eb95d9f1f..ffdd0699c7a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -33,11 +33,10 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockClientSupplier;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
-import org.easymock.MockType;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.File;
 import java.util.Collections;
@@ -50,10 +49,6 @@ import java.util.stream.Collectors;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -61,15 +56,17 @@ import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertThrows;
 import static java.util.Collections.emptySet;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ActiveTaskCreatorTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalTopologyBuilder builder;
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
-    @Mock(type = MockType.NICE)
+    @Mock
     private ChangelogReader changeLogReader;
 
     private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
@@ -476,21 +473,16 @@ public class ActiveTaskCreatorTest {
         final ProcessorTopology topology = mock(ProcessorTopology.class);
         final SourceNode sourceNode = mock(SourceNode.class);
 
-        reset(builder, stateDirectory);
-        expect(builder.topologyConfigs()).andStubReturn(new TopologyConfig(new 
StreamsConfig(properties)));
-        expect(builder.buildSubtopology(0)).andReturn(topology).anyTimes();
-        expect(topology.sinkTopics()).andStubReturn(emptySet());
-        
expect(stateDirectory.getOrCreateDirectoryForTask(task00)).andReturn(mock(File.class));
-        
expect(stateDirectory.checkpointFileFor(task00)).andReturn(mock(File.class));
-        
expect(stateDirectory.getOrCreateDirectoryForTask(task01)).andReturn(mock(File.class));
-        
expect(stateDirectory.checkpointFileFor(task01)).andReturn(mock(File.class));
-        
expect(topology.storeToChangelogTopic()).andReturn(Collections.emptyMap()).anyTimes();
-        expect(topology.source("topic")).andReturn(sourceNode).anyTimes();
-        
expect(sourceNode.getTimestampExtractor()).andReturn(mock(TimestampExtractor.class)).anyTimes();
-        
expect(topology.globalStateStores()).andReturn(Collections.emptyList()).anyTimes();
-        
expect(topology.terminalNodes()).andStubReturn(Collections.singleton(sourceNode.name()));
-        
expect(topology.sources()).andStubReturn(Collections.singleton(sourceNode));
-        replay(builder, stateDirectory, topology, sourceNode);
+        when(builder.topologyConfigs()).thenReturn(new TopologyConfig(new 
StreamsConfig(properties)));
+        when(builder.buildSubtopology(0)).thenReturn(topology);
+        when(topology.sinkTopics()).thenReturn(emptySet());
+        
when(stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(mock(File.class));
+        
when(stateDirectory.checkpointFileFor(task00)).thenReturn(mock(File.class));
+        
when(stateDirectory.getOrCreateDirectoryForTask(task01)).thenReturn(mock(File.class));
+        
when(stateDirectory.checkpointFileFor(task01)).thenReturn(mock(File.class));
+        when(topology.source("topic")).thenReturn(sourceNode);
+        
when(sourceNode.getTimestampExtractor()).thenReturn(mock(TimestampExtractor.class));
+        when(topology.sources()).thenReturn(Collections.singleton(sourceNode));
 
         final StreamsConfig config = new StreamsConfig(properties);
         activeTaskCreator = new ActiveTaskCreator(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java
index 6da192fdee7..b4e406504dc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java
@@ -22,6 +22,8 @@ import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.Topi
 import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
 import java.util.Map;
@@ -32,13 +34,12 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ChangelogTopicsTest {
 
     private static final String SOURCE_TOPIC_NAME = "source";
@@ -83,16 +84,14 @@ public class ChangelogTopicsTest {
 
     @Test
     public void shouldNotContainChangelogsForStatelessTasks() {
-        
expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
+        
when(internalTopicManager.makeReady(Collections.emptyMap())).thenReturn(Collections.emptySet());
         final Map<Subtopology, TopicsInfo> topicGroups = 
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO2));
         final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = 
mkMap(mkEntry(SUBTOPOLOGY_0, mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
-        replay(internalTopicManager);
 
         final ChangelogTopics changelogTopics =
                 new ChangelogTopics(internalTopicManager, topicGroups, 
tasksForTopicGroup, "[test] ");
         changelogTopics.setup();
 
-        verify(internalTopicManager);
         assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), 
is(Collections.emptySet()));
         assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), 
is(Collections.emptySet()));
         assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_2), 
is(Collections.emptySet()));
@@ -102,18 +101,16 @@ public class ChangelogTopicsTest {
 
     @Test
     public void 
shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated() {
-        
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG))))
-            .andStubReturn(mkSet(CHANGELOG_TOPIC_NAME1));
+        
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG))))
+            .thenReturn(mkSet(CHANGELOG_TOPIC_NAME1));
         final Map<Subtopology, TopicsInfo> topicGroups = 
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1));
         final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
         final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = 
mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
-        replay(internalTopicManager);
 
         final ChangelogTopics changelogTopics =
                 new ChangelogTopics(internalTopicManager, topicGroups, 
tasksForTopicGroup, "[test] ");
         changelogTopics.setup();
 
-        verify(internalTopicManager);
         
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE),
 is(3));
         assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_0), 
is(Collections.emptySet()));
         assertThat(changelogTopics.preExistingPartitionsFor(TASK_0_1), 
is(Collections.emptySet()));
@@ -124,18 +121,16 @@ public class ChangelogTopicsTest {
 
     @Test
     public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() {
-        
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG))))
-            .andStubReturn(Collections.emptySet());
+        
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG))))
+            .thenReturn(Collections.emptySet());
         final Map<Subtopology, TopicsInfo> topicGroups = 
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1));
         final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
         final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = 
mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
-        replay(internalTopicManager);
 
         final ChangelogTopics changelogTopics =
                 new ChangelogTopics(internalTopicManager, topicGroups, 
tasksForTopicGroup, "[test] ");
         changelogTopics.setup();
 
-        verify(internalTopicManager);
         
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE),
 is(3));
         final TopicPartition changelogPartition0 = new 
TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
         final TopicPartition changelogPartition1 = new 
TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
@@ -152,17 +147,15 @@ public class ChangelogTopicsTest {
 
     @Test
     public void shouldOnlyContainPreExistingSourceBasedChangelogs() {
-        
expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
+        
when(internalTopicManager.makeReady(Collections.emptyMap())).thenReturn(Collections.emptySet());
         final Map<Subtopology, TopicsInfo> topicGroups = 
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO3));
         final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
         final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = 
mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
-        replay(internalTopicManager);
 
         final ChangelogTopics changelogTopics =
                 new ChangelogTopics(internalTopicManager, topicGroups, 
tasksForTopicGroup, "[test] ");
         changelogTopics.setup();
 
-        verify(internalTopicManager);
         final TopicPartition changelogPartition0 = new 
TopicPartition(SOURCE_TOPIC_NAME, 0);
         final TopicPartition changelogPartition1 = new 
TopicPartition(SOURCE_TOPIC_NAME, 1);
         final TopicPartition changelogPartition2 = new 
TopicPartition(SOURCE_TOPIC_NAME, 2);
@@ -178,18 +171,16 @@ public class ChangelogTopicsTest {
 
     @Test
     public void shouldContainBothTypesOfPreExistingChangelogs() {
-        
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG))))
-            .andStubReturn(Collections.emptySet());
+        
when(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, 
CHANGELOG_TOPIC_CONFIG))))
+            .thenReturn(Collections.emptySet());
         final Map<Subtopology, TopicsInfo> topicGroups = 
mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO4));
         final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
         final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = 
mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
-        replay(internalTopicManager);
 
         final ChangelogTopics changelogTopics =
                 new ChangelogTopics(internalTopicManager, topicGroups, 
tasksForTopicGroup, "[test] ");
         changelogTopics.setup();
 
-        verify(internalTopicManager);
         
assertThat(CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE),
 is(3));
         final TopicPartition changelogPartition0 = new 
TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
         final TopicPartition changelogPartition1 = new 
TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 1dffd7ac665..07536c414a3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -17,13 +17,11 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -32,18 +30,20 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.hamcrest.core.IsInstanceOf;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class GlobalProcessorContextImplTest {
     private static final String GLOBAL_STORE_NAME = "global-store";
     private static final String GLOBAL_KEY_VALUE_STORE_NAME = 
"global-key-value-store";
@@ -55,27 +55,16 @@ public class GlobalProcessorContextImplTest {
 
     private GlobalProcessorContextImpl globalContext;
 
+    @Mock
     private ProcessorNode<Object, Object, Object, Object> child;
     private ProcessorRecordContext recordContext;
+    @Mock
+    private GlobalStateManager stateManager;
 
     @Before
     public void setup() {
         final StreamsConfig streamsConfig = mock(StreamsConfig.class);
-        
expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("dummy-id");
-        
expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
-        expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
-        replay(streamsConfig);
-
-        final GlobalStateManager stateManager = mock(GlobalStateManager.class);
-        
expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(StateStore.class));
-        
expect(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).andReturn(mock(KeyValueStore.class));
-        
expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).andReturn(mock(TimestampedKeyValueStore.class));
-        
expect(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).andReturn(mock(WindowStore.class));
-        
expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).andReturn(mock(TimestampedWindowStore.class));
-        
expect(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).andReturn(mock(SessionStore.class));
-        expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null);
-        expect(stateManager.taskType()).andStubReturn(TaskType.GLOBAL);
-        replay(stateManager);
+        
when(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).thenReturn("dummy-id");
 
         globalContext = new GlobalProcessorContextImpl(
             streamsConfig,
@@ -86,7 +75,6 @@ public class GlobalProcessorContextImplTest {
 
         final ProcessorNode<Object, Object, Object, Object> processorNode = 
new ProcessorNode<>("testNode");
 
-        child = mock(ProcessorNode.class);
         processorNode.addChild(child);
 
         globalContext.setCurrentNode(processorNode);
@@ -96,20 +84,18 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldReturnGlobalOrNullStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).thenReturn(mock(StateStore.class));
         assertThat(globalContext.getStateStore(GLOBAL_STORE_NAME), new 
IsInstanceOf(StateStore.class));
         assertNull(globalContext.getStateStore(UNKNOWN_STORE));
     }
 
     @Test
     public void shouldForwardToSingleChild() {
-        child.process(anyObject());
-        expectLastCall();
+        doNothing().when(child).process(any());
 
-        expect(recordContext.timestamp()).andStubReturn(0L);
-        expect(recordContext.headers()).andStubReturn(new RecordHeaders());
-        replay(child, recordContext);
+        when(recordContext.timestamp()).thenReturn(0L);
+        when(recordContext.headers()).thenReturn(new RecordHeaders());
         globalContext.forward((Object /*forcing a call to the K/V forward*/) 
null, null);
-        verify(child, recordContext);
     }
 
     @Test
@@ -129,6 +115,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowInitForKeyValueStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).thenReturn(mock(KeyValueStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
             store.init((StateStoreContext) null, null);
@@ -138,6 +125,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowInitForTimestampedKeyValueStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).thenReturn(mock(TimestampedKeyValueStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
         try {
             store.init((StateStoreContext) null, null);
@@ -147,6 +135,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowInitForWindowStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).thenReturn(mock(WindowStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
         try {
             store.init((StateStoreContext) null, null);
@@ -156,6 +145,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowInitForTimestampedWindowStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).thenReturn(mock(TimestampedWindowStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
         try {
             store.init((StateStoreContext) null, null);
@@ -165,6 +155,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowInitForSessionStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).thenReturn(mock(SessionStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
         try {
             store.init((StateStoreContext) null, null);
@@ -174,6 +165,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowCloseForKeyValueStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).thenReturn(mock(KeyValueStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
             store.close();
@@ -183,6 +175,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowCloseForTimestampedKeyValueStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).thenReturn(mock(TimestampedKeyValueStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
         try {
             store.close();
@@ -192,6 +185,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowCloseForWindowStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).thenReturn(mock(WindowStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
         try {
             store.close();
@@ -201,6 +195,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowCloseForTimestampedWindowStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).thenReturn(mock(TimestampedWindowStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
         try {
             store.close();
@@ -210,6 +205,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldNotAllowCloseForSessionStore() {
+        
when(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).thenReturn(mock(SessionStore.class));
         final StateStore store = 
globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
         try {
             store.close();

Reply via email to