http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapperTest.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapperTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapperTest.java new file mode 100644 index 0000000..e44daac --- /dev/null +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapperTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link ProcessManagerWrapper}. + */ +@RunWith(MockitoJUnitRunner.class) +public class ProcessManagerWrapperTest { + /** Delegate. */ + @Mock + private ProcessManager<String> delegate; + + /** Process manager wrapper. */ + private ProcessManagerWrapper<String, Integer> wrapper; + + /** Initializes tests. */ + @Before + public void init() { + wrapper = new TestProcessManagerWrapper(delegate); + } + + /** */ + @Test + public void testStart() { + wrapper.start(Arrays.asList(1, 2, 3)); + + verify(delegate).start(eq(Arrays.asList("1", "2", "3"))); + } + + /** */ + @Test + public void testPing() { + Map<UUID, List<UUID>> procIds = Collections.emptyMap(); + wrapper.ping(procIds); + + verify(delegate).ping(eq(procIds)); + } + + /** */ + @Test + public void testStop() { + Map<UUID, List<UUID>> procIds = Collections.emptyMap(); + wrapper.stop(procIds, true); + + verify(delegate).stop(eq(procIds), eq(true)); + } + + /** */ + @Test + public void testClear() { + Map<UUID, List<UUID>> procIds = Collections.emptyMap(); + wrapper.clear(procIds); + + verify(delegate).clear(eq(procIds)); + } + + /** + * Process manager wrapper to be used in tests. + */ + private static class TestProcessManagerWrapper extends ProcessManagerWrapper<String, Integer> { + /** */ + private static final long serialVersionUID = 7562628311662129855L; + + /** + * Constructs a new instance of process manager wrapper. + * + * @param delegate Delegate. + */ + public TestProcessManagerWrapper(ProcessManager<String> delegate) { + super(delegate); + } + + /** {@inheritDoc} */ + @Override protected String transformSpecification(Integer spec) { + return spec.toString(); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java new file mode 100644 index 0000000..7d917e7 --- /dev/null +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Supplier; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCluster; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessClearTask; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessPingTask; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessStartTask; +import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessStopTask; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link LongRunningProcessManager}. + */ +public class LongRunningProcessManagerTest { + /** */ + @Test + @SuppressWarnings("unchecked") + public void testStart() { + UUID nodeId = UUID.randomUUID(); + UUID procId = UUID.randomUUID(); + + Ignite ignite = mock(Ignite.class); + IgniteCluster cluster = mock(IgniteCluster.class); + ClusterGroup clusterGrp = mock(ClusterGroup.class); + IgniteCompute igniteCompute = mock(IgniteCompute.class); + doReturn(cluster).when(ignite).cluster(); + doReturn(igniteCompute).when(ignite).compute(eq(clusterGrp)); + doReturn(clusterGrp).when(cluster).forNodeId(eq(nodeId)); + doReturn(Collections.singletonList(procId)).when(igniteCompute).call(any(IgniteCallable.class)); + + List<LongRunningProcess> list = Collections.singletonList(new LongRunningProcess(nodeId, () -> {})); + + LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite); + Map<UUID, List<UUID>> res = mgr.start(list); + + assertEquals(1, res.size()); + assertTrue(res.containsKey(nodeId)); + assertEquals(procId, res.get(nodeId).iterator().next()); + + verify(igniteCompute).call(any(LongRunningProcessStartTask.class)); + } + + /** */ + @Test + @SuppressWarnings("unchecked") + public void testPing() { + UUID nodeId = UUID.randomUUID(); + UUID procId = UUID.randomUUID(); + + Ignite ignite = mock(Ignite.class); + IgniteCluster cluster = mock(IgniteCluster.class); + ClusterGroup clusterGrp = mock(ClusterGroup.class); + IgniteCompute igniteCompute = mock(IgniteCompute.class); + doReturn(cluster).when(ignite).cluster(); + doReturn(igniteCompute).when(ignite).compute(eq(clusterGrp)); + doReturn(clusterGrp).when(cluster).forNodeId(eq(nodeId)); + doReturn(Collections.singletonList(new LongRunningProcessStatus(LongRunningProcessState.RUNNING))) + .when(igniteCompute).call(any(IgniteCallable.class)); + + Map<UUID, List<UUID>> procIds = new HashMap<>(); + procIds.put(nodeId, Collections.singletonList(procId)); + + LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite); + Map<UUID, List<LongRunningProcessStatus>> res = mgr.ping(procIds); + + assertEquals(1, res.size()); + assertTrue(res.containsKey(nodeId)); + assertEquals(LongRunningProcessState.RUNNING, res.get(nodeId).iterator().next().getState()); + + verify(igniteCompute).call(any(LongRunningProcessPingTask.class)); + } + + /** */ + @Test + @SuppressWarnings("unchecked") + public void testStop() { + UUID nodeId = UUID.randomUUID(); + UUID procId = UUID.randomUUID(); + + Ignite ignite = mock(Ignite.class); + IgniteCluster cluster = mock(IgniteCluster.class); + ClusterGroup clusterGrp = mock(ClusterGroup.class); + IgniteCompute igniteCompute = mock(IgniteCompute.class); + doReturn(cluster).when(ignite).cluster(); + doReturn(igniteCompute).when(ignite).compute(eq(clusterGrp)); + doReturn(clusterGrp).when(cluster).forNodeId(eq(nodeId)); + doReturn(Collections.singletonList(new LongRunningProcessStatus(LongRunningProcessState.RUNNING))) + .when(igniteCompute).call(any(IgniteCallable.class)); + + Map<UUID, List<UUID>> procIds = new HashMap<>(); + procIds.put(nodeId, Collections.singletonList(procId)); + + LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite); + Map<UUID, List<LongRunningProcessStatus>> res = mgr.stop(procIds, true); + + assertEquals(1, res.size()); + assertTrue(res.containsKey(nodeId)); + assertEquals(LongRunningProcessState.RUNNING, res.get(nodeId).iterator().next().getState()); + + verify(igniteCompute).call(any(LongRunningProcessStopTask.class)); + } + + /** */ + @Test + @SuppressWarnings("unchecked") + public void testClear() { + UUID nodeId = UUID.randomUUID(); + UUID procId = UUID.randomUUID(); + + Ignite ignite = mock(Ignite.class); + IgniteCluster cluster = mock(IgniteCluster.class); + ClusterGroup clusterGrp = mock(ClusterGroup.class); + IgniteCompute igniteCompute = mock(IgniteCompute.class); + doReturn(cluster).when(ignite).cluster(); + doReturn(igniteCompute).when(ignite).compute(eq(clusterGrp)); + doReturn(clusterGrp).when(cluster).forNodeId(eq(nodeId)); + doReturn(Collections.singletonList(new LongRunningProcessStatus(LongRunningProcessState.RUNNING))) + .when(igniteCompute).call(any(IgniteCallable.class)); + + Map<UUID, List<UUID>> procIds = new HashMap<>(); + procIds.put(nodeId, Collections.singletonList(procId)); + + LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite); + Map<UUID, List<LongRunningProcessStatus>> res = mgr.clear(procIds); + + assertEquals(1, res.size()); + assertTrue(res.containsKey(nodeId)); + assertEquals(LongRunningProcessState.RUNNING, res.get(nodeId).iterator().next().getState()); + + verify(igniteCompute).call(any(LongRunningProcessClearTask.class)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessClearTaskTest.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessClearTaskTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessClearTaskTest.java new file mode 100644 index 0000000..268f3e4 --- /dev/null +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessClearTaskTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +/** + * Tests for {@link LongRunningProcessClearTask}. + */ +public class LongRunningProcessClearTaskTest { + /** Process metadata storage used instead of Apache Ignite node local storage. */ + private final ConcurrentMap<UUID, Future<?>> metadataStorage = new ConcurrentHashMap<>(); + + /** Initializes tests. */ + @Before + public void init() { + metadataStorage.clear(); + } + + /** Tests execution of the task in case process is not found. */ + @Test + public void testCallProcessNotFound() { + LongRunningProcessClearTask clearTask = createTask(UUID.randomUUID()); + + List<LongRunningProcessStatus> statuses = clearTask.call(); + + assertEquals(1, statuses.size()); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.NOT_FOUND, status.getState()); + assertNull(status.getException()); + + assertEquals(0, metadataStorage.size()); + } + + /** Tests execution of the task in case process is running. */ + @Test(expected = IllegalStateException.class) + public void testCallProcessIsRunning() { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(false).when(fut).isDone(); + metadataStorage.put(procId, fut); + + LongRunningProcessClearTask clearTask = createTask(procId); + + clearTask.call(); + } + + /** Tests execution of the task in case process is done. */ + @Test + public void testCallProcessIsDone() { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(true).when(fut).isDone(); + metadataStorage.put(procId, fut); + + LongRunningProcessClearTask clearTask = createTask(procId); + + List<LongRunningProcessStatus> statuses = clearTask.call(); + + assertEquals(1, statuses.size()); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.DONE, status.getState()); + assertNull(status.getException()); + + assertEquals(0, metadataStorage.size()); + } + + /** Tests execution of the task in case process is done with exception. */ + @Test + public void testCallProcessIsDoneWithException() throws ExecutionException, InterruptedException { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(true).when(fut).isDone(); + doThrow(RuntimeException.class).when(fut).get(); + metadataStorage.put(procId, fut); + + LongRunningProcessClearTask clearTask = createTask(procId); + + List<LongRunningProcessStatus> statuses = clearTask.call(); + + assertEquals(1, statuses.size()); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.DONE, status.getState()); + assertNotNull(status.getException()); + assertTrue(status.getException() instanceof RuntimeException); + + assertEquals(0, metadataStorage.size()); + } + + /** + * Creates clear task. + * + * @param procId Process identifier. + * @return Clear task. + */ + private LongRunningProcessClearTask createTask(UUID procId) { + LongRunningProcessClearTask clearTask = new LongRunningProcessClearTask(Collections.singletonList(procId)); + + clearTask = spy(clearTask); + doReturn(metadataStorage).when(clearTask).getMetadataStorage(); + + return clearTask; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessPingTaskTest.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessPingTaskTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessPingTaskTest.java new file mode 100644 index 0000000..818509a --- /dev/null +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessPingTaskTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +/** + * Tests for {@link LongRunningProcessPingTask}. + */ +public class LongRunningProcessPingTaskTest { + /** Process metadata storage used instead of Apache Ignite node local storage. */ + private final ConcurrentMap<UUID, Future<?>> metadataStorage = new ConcurrentHashMap<>(); + + /** Initializes tests. */ + @Before + public void init() { + metadataStorage.clear(); + } + + /** Tests execution of the task in case process is not found. */ + @Test + public void testCallProcessNotFound() { + LongRunningProcessPingTask pingTask = createTask(UUID.randomUUID()); + + List<LongRunningProcessStatus> statuses = pingTask.call(); + + assertEquals(1, statuses.size()); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.NOT_FOUND, status.getState()); + assertNull(status.getException()); + + assertEquals(0, metadataStorage.size()); + } + + /** Tests execution of the task in case process is running. */ + @Test + public void testCallProcessIsRunning() { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(false).when(fut).isDone(); + metadataStorage.put(procId, fut); + + LongRunningProcessPingTask pingTask = createTask(procId); + + List<LongRunningProcessStatus> statuses = pingTask.call(); + + assertEquals(1, statuses.size()); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.RUNNING, status.getState()); + assertNull(status.getException()); + + assertEquals(1, metadataStorage.size()); + } + + /** Tests execution of the task in case process is done. */ + @Test + public void testCallProcessIsDone() { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(true).when(fut).isDone(); + metadataStorage.put(procId, fut); + + LongRunningProcessPingTask pingTask = createTask(procId); + + List<LongRunningProcessStatus> statuses = pingTask.call(); + + assertEquals(1, statuses.size()); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.DONE, status.getState()); + assertNull(status.getException()); + + assertEquals(1, metadataStorage.size()); + } + + /** Tests execution of the task in case process is done with exception. */ + @Test + public void testCallProcessIsDoneWithException() throws ExecutionException, InterruptedException { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(true).when(fut).isDone(); + doThrow(RuntimeException.class).when(fut).get(); + metadataStorage.put(procId, fut); + + LongRunningProcessPingTask pingTask = createTask(procId); + + List<LongRunningProcessStatus> statuses = pingTask.call(); + + assertEquals(1, statuses.size()); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.DONE, status.getState()); + assertNotNull(status.getException()); + assertTrue(status.getException() instanceof RuntimeException); + + assertEquals(1, metadataStorage.size()); + } + + /** + * Creates ping task. + * + * @param procId Process identifier. + * @return Ping task. + */ + private LongRunningProcessPingTask createTask(UUID procId) { + LongRunningProcessPingTask clearTask = new LongRunningProcessPingTask(Collections.singletonList(procId)); + + clearTask = spy(clearTask); + doReturn(metadataStorage).when(clearTask).getMetadataStorage(); + + return clearTask; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTaskTest.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTaskTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTaskTest.java new file mode 100644 index 0000000..0d14335 --- /dev/null +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTaskTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +/** + * Tests for {@link LongRunningProcessStartTask}. + */ +public class LongRunningProcessStartTaskTest { + /** Process metadata storage used instead of Apache Ignite node local storage. */ + private final ConcurrentMap<UUID, Future<?>> metadataStorage = new ConcurrentHashMap<>(); + + /** Initializes tests. */ + @Before + public void init() { + metadataStorage.clear(); + } + + /** */ + @Test + public void testCall() throws ExecutionException, InterruptedException { + LongRunningProcess proc = new LongRunningProcess(UUID.randomUUID(), () -> {}); + LongRunningProcessStartTask task = createTask(proc); + List<UUID> procIds = task.call(); + + assertEquals(1, procIds.size()); + + UUID procId = procIds.get(0); + + assertNotNull(metadataStorage.get(procId)); + + Future<?> fut = metadataStorage.get(procId); + fut.get(); + + assertEquals(true, fut.isDone()); + } + + /** */ + @Test(expected = ExecutionException.class) + public void testCallWithException() throws ExecutionException, InterruptedException { + LongRunningProcess proc = new LongRunningProcess(UUID.randomUUID(), () -> { + throw new RuntimeException(); + }); + LongRunningProcessStartTask task = createTask(proc); + List<UUID> procIds = task.call(); + + assertEquals(1, procIds.size()); + + UUID procId = procIds.get(0); + + assertNotNull(metadataStorage.get(procId)); + + Future<?> fut = metadataStorage.get(procId); + fut.get(); + } + + /** + * Creates start task. + * + * @param proc Long running process. + * @return Start task. + */ + private LongRunningProcessStartTask createTask(LongRunningProcess proc) { + LongRunningProcessStartTask startTask = new LongRunningProcessStartTask(Collections.singletonList(proc)); + + startTask = spy(startTask); + doReturn(metadataStorage).when(startTask).getMetadataStorage(); + + return startTask; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5ee876a6/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStopTaskTest.java ---------------------------------------------------------------------- diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStopTaskTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStopTaskTest.java new file mode 100644 index 0000000..129f0cb --- /dev/null +++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStopTaskTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tensorflow.core.longrunning.task; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState; +import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link LongRunningProcessStopTask}. + */ +public class LongRunningProcessStopTaskTest { + /** Process metadata storage used instead of Apache Ignite node local storage. */ + private final ConcurrentMap<UUID, Future<?>> metadataStorage = new ConcurrentHashMap<>(); + + /** Initializes tests. */ + @Before + public void init() { + metadataStorage.clear(); + } + + /** Tests execution of the task in case process is not found. */ + @Test + public void testCallProcessNotFound() { + LongRunningProcessStopTask stopTask = createTask(UUID.randomUUID(), true); + + List<LongRunningProcessStatus> statuses = stopTask.call(); + + assertEquals(1, statuses.size()); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.NOT_FOUND, status.getState()); + assertNull(status.getException()); + + assertEquals(0, metadataStorage.size()); + } + + /** Tests execution of the task in case process is running. */ + @Test + public void testCallProcessIsRunning() { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(false).when(fut).isDone(); + metadataStorage.put(procId, fut); + + LongRunningProcessStopTask stopTask = createTask(procId, true); + + List<LongRunningProcessStatus> statuses = stopTask.call(); + + assertEquals(1, statuses.size()); + verify(fut).cancel(eq(true)); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.DONE, status.getState()); + assertNull(status.getException()); + + assertEquals(0, metadataStorage.size()); + } + + /** Tests execution of the task in case process is done. */ + @Test + public void testCallProcessIsDone() { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(true).when(fut).isDone(); + metadataStorage.put(procId, fut); + + LongRunningProcessStopTask stopTask = createTask(procId, true); + + List<LongRunningProcessStatus> statuses = stopTask.call(); + + assertEquals(1, statuses.size()); + verify(fut).cancel(eq(true)); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.DONE, status.getState()); + assertNull(status.getException()); + + assertEquals(0, metadataStorage.size()); + } + + /** Tests execution of the task in case process is done with exception. */ + @Test + public void testCallProcessIsDoneWithException() throws ExecutionException, InterruptedException { + UUID procId = UUID.randomUUID(); + + Future<?> fut = mock(Future.class); + doReturn(true).when(fut).isDone(); + doThrow(RuntimeException.class).when(fut).get(); + metadataStorage.put(procId, fut); + + LongRunningProcessStopTask stopTask = createTask(procId, true); + + List<LongRunningProcessStatus> statuses = stopTask.call(); + + assertEquals(1, statuses.size()); + verify(fut).cancel(eq(true)); + + LongRunningProcessStatus status = statuses.get(0); + assertEquals(LongRunningProcessState.DONE, status.getState()); + assertNotNull(status.getException()); + assertTrue(status.getException() instanceof RuntimeException); + + assertEquals(0, metadataStorage.size()); + } + + /** + * Creates stop task. + * + * @param procId Process identifier. + * @return Stop task. + */ + private LongRunningProcessStopTask createTask(UUID procId, boolean clear) { + LongRunningProcessStopTask clearTask = new LongRunningProcessStopTask(Collections.singletonList(procId), clear); + + clearTask = Mockito.spy(clearTask); + doReturn(metadataStorage).when(clearTask).getMetadataStorage(); + + return clearTask; + } +}