http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java index 86a553f..db7079c 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java @@ -19,12 +19,12 @@ package org.apache.samza.operators.spec; import java.util.Collection; +import java.util.Map; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OperatorSpecGraph; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; @@ -36,9 +36,9 @@ import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.junit.Before; import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -57,7 +57,6 @@ public class TestPartitionByOperatorSpec { private final String testJobName = "testJob"; private final String testJobId = "1"; private final String testRepartitionedStreamName = "parByKey"; - private StreamGraphSpec graphSpec = null; class TimerMapFn implements MapFunction<Object, String>, TimerFunction<String, Object> { @@ -99,25 +98,28 @@ public class TestPartitionByOperatorSpec { public void setup() { when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn(testJobName); when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn(testJobId); - graphSpec = new StreamGraphSpec(mockConfig); } @Test public void testPartitionBy() { - MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); MapFunction<Object, String> keyFn = m -> m.toString(); MapFunction<Object, Object> valueFn = m -> m; KVSerde<Object, Object> partitionBySerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); - MessageStream<KV<String, Object>> - reparStream = inputStream.partitionBy(keyFn, valueFn, partitionBySerde, testRepartitionedStreamName); - InputOperatorSpec inputOpSpec = (InputOperatorSpec) Whitebox.getInternalState(reparStream, "operatorSpec"); - assertEquals(inputOpSpec.getStreamId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { + MessageStream inputStream = appDesc.getInputStream(testinputDescriptor); + inputStream.partitionBy(keyFn, valueFn, partitionBySerde, testRepartitionedStreamName); + }, mockConfig); + assertEquals(2, streamAppDesc.getInputOperators().size()); + Map<String, InputOperatorSpec> inputOpSpecs = streamAppDesc.getInputOperators(); + assertTrue(inputOpSpecs.keySet().contains(String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName))); + InputOperatorSpec inputOpSpec = inputOpSpecs.get(String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); + assertEquals(String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName), inputOpSpec.getStreamId()); assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde); assertTrue(inputOpSpec.isKeyed()); assertNull(inputOpSpec.getTimerFn()); assertNull(inputOpSpec.getWatermarkFn()); - InputOperatorSpec originInputSpec = (InputOperatorSpec) Whitebox.getInternalState(inputStream, "operatorSpec"); + InputOperatorSpec originInputSpec = inputOpSpecs.get(testinputDescriptor.getStreamId()); assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec); PartitionByOperatorSpec reparOpSpec = (PartitionByOperatorSpec) originInputSpec.getRegisteredOperatorSpecs().toArray()[0]; assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); @@ -130,19 +132,21 @@ public class TestPartitionByOperatorSpec { @Test public void testPartitionByWithNoSerde() { - MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); MapFunction<Object, String> keyFn = m -> m.toString(); MapFunction<Object, Object> valueFn = m -> m; - MessageStream<KV<String, Object>> - reparStream = inputStream.partitionBy(keyFn, valueFn, testRepartitionedStreamName); - InputOperatorSpec inputOpSpec = (InputOperatorSpec) Whitebox.getInternalState(reparStream, "operatorSpec"); - assertEquals(inputOpSpec.getStreamId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { + MessageStream inputStream = appDesc.getInputStream(testinputDescriptor); + inputStream.partitionBy(keyFn, valueFn, testRepartitionedStreamName); + }, mockConfig); + InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get( + String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); + assertNotNull(inputOpSpec); assertNull(inputOpSpec.getKeySerde()); assertNull(inputOpSpec.getValueSerde()); assertTrue(inputOpSpec.isKeyed()); assertNull(inputOpSpec.getTimerFn()); assertNull(inputOpSpec.getWatermarkFn()); - InputOperatorSpec originInputSpec = (InputOperatorSpec) Whitebox.getInternalState(inputStream, "operatorSpec"); + InputOperatorSpec originInputSpec = streamAppDesc.getInputOperators().get(testinputDescriptor.getStreamId()); assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec); PartitionByOperatorSpec reparOpSpec = (PartitionByOperatorSpec) originInputSpec.getRegisteredOperatorSpecs().toArray()[0]; assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); @@ -155,9 +159,11 @@ public class TestPartitionByOperatorSpec { @Test public void testCopy() { - MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); - inputStream.partitionBy(m -> m.toString(), m -> m, testRepartitionedStreamName); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { + MessageStream inputStream = appDesc.getInputStream(testinputDescriptor); + inputStream.partitionBy(m -> m.toString(), m -> m, testRepartitionedStreamName); + }, mockConfig); + OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph(); OperatorSpecGraph clonedGraph = specGraph.clone(); OperatorSpecTestUtils.assertClonedGraph(specGraph, clonedGraph); } @@ -165,28 +171,36 @@ public class TestPartitionByOperatorSpec { @Test(expected = IllegalArgumentException.class) public void testTimerFunctionAsKeyFn() { TimerMapFn keyFn = new TimerMapFn(); - MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); - inputStream.partitionBy(keyFn, m -> m, "parByKey"); + new StreamApplicationDescriptorImpl(appDesc -> { + MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor); + inputStream.partitionBy(keyFn, m -> m, "parByKey"); + }, mockConfig); } @Test(expected = IllegalArgumentException.class) public void testWatermarkFunctionAsKeyFn() { WatermarkMapFn keyFn = new WatermarkMapFn(); - MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); - inputStream.partitionBy(keyFn, m -> m, "parByKey"); + new StreamApplicationDescriptorImpl(appDesc -> { + MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor); + inputStream.partitionBy(keyFn, m -> m, "parByKey"); + }, mockConfig); } @Test(expected = IllegalArgumentException.class) public void testTimerFunctionAsValueFn() { TimerMapFn valueFn = new TimerMapFn(); - MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); - inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey"); + new StreamApplicationDescriptorImpl(appDesc -> { + MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor); + inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey"); + }, mockConfig); } @Test(expected = IllegalArgumentException.class) public void testWatermarkFunctionAsValueFn() { WatermarkMapFn valueFn = new WatermarkMapFn(); - MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); - inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey"); + new StreamApplicationDescriptorImpl(appDesc -> { + MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor); + inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey"); + }, mockConfig); } }
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 052aa29..673015a 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -19,6 +19,13 @@ package org.apache.samza.processor; import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.samza.SamzaContainerStatus; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; @@ -30,21 +37,18 @@ import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.processor.StreamProcessor.State; +import org.apache.samza.runtime.ProcessorLifecycleListener; import org.apache.samza.task.StreamTask; import org.apache.samza.task.StreamTaskFactory; +import org.apache.samza.task.TaskFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; + +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -54,7 +58,7 @@ import static org.mockito.Mockito.when; public class TestStreamProcessor { private ConcurrentMap<ListenerCallback, Boolean> processorListenerState; private enum ListenerCallback { - ON_START, ON_SHUTDOWN, ON_FAILURE + BEFORE_START, AFTER_START, AFTER_STOP, AFTER_FAILURE } @Before @@ -62,9 +66,10 @@ public class TestStreamProcessor { Mockito.reset(); processorListenerState = new ConcurrentHashMap<ListenerCallback, Boolean>() { { - put(ListenerCallback.ON_START, false); - put(ListenerCallback.ON_FAILURE, false); - put(ListenerCallback.ON_SHUTDOWN, false); + put(ListenerCallback.BEFORE_START, false); + put(ListenerCallback.AFTER_START, false); + put(ListenerCallback.AFTER_STOP, false); + put(ListenerCallback.AFTER_FAILURE, false); } }; } @@ -83,7 +88,7 @@ public class TestStreamProcessor { Config config, Map<String, MetricsReporter> customMetricsReporters, StreamTaskFactory streamTaskFactory, - StreamProcessorLifecycleListener processorListener, + ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator, SamzaContainer container) { super(config, customMetricsReporters, streamTaskFactory, processorListener, jobCoordinator); @@ -144,22 +149,27 @@ public class TestStreamProcessor { new MapConfig(), new HashMap<>(), mock(StreamTaskFactory.class), - new StreamProcessorLifecycleListener() { + new ProcessorLifecycleListener() { @Override - public void onStart() { - processorListenerState.put(ListenerCallback.ON_START, true); + public void afterStart() { + processorListenerState.put(ListenerCallback.AFTER_START, true); processorListenerStart.countDown(); } @Override - public void onShutdown() { - processorListenerState.put(ListenerCallback.ON_SHUTDOWN, true); + public void afterFailure(Throwable t) { + processorListenerState.put(ListenerCallback.AFTER_FAILURE, true); + } + + @Override + public void afterStop() { + processorListenerState.put(ListenerCallback.AFTER_STOP, true); processorListenerStop.countDown(); } @Override - public void onFailure(Throwable t) { - processorListenerState.put(ListenerCallback.ON_FAILURE, true); + public void beforeStart() { + processorListenerState.put(ListenerCallback.BEFORE_START, true); } }, mockJobCoordinator, @@ -193,7 +203,7 @@ public class TestStreamProcessor { processor.start(); processorListenerStart.await(); - Assert.assertEquals(SamzaContainerStatus.STARTED, processor.getContainerStatus()); + assertEquals(SamzaContainerStatus.STARTED, processor.getContainerStatus()); // This block is required for the mockRunloop is actually start. // Otherwise, processor.stop gets triggered before mockRunloop begins to block @@ -204,9 +214,10 @@ public class TestStreamProcessor { processorListenerStop.await(); // Assertions on which callbacks are expected to be invoked - Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_START)); - Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_SHUTDOWN)); - Assert.assertFalse(processorListenerState.get(ListenerCallback.ON_FAILURE)); + Assert.assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START)); + Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_START)); + Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_STOP)); + Assert.assertFalse(processorListenerState.get(ListenerCallback.AFTER_FAILURE)); } /** @@ -215,7 +226,7 @@ public class TestStreamProcessor { * * Assertions: * - JobCoordinator has been stopped from the JobCoordinatorListener callback - * - StreamProcessorLifecycleListener#onFailure(Throwable) has been invoked + * - ProcessorLifecycleListener#afterStop(Throwable) has been invoked w/ non-null Throwable */ @Test public void testContainerFailureCorrectlyStopsProcessor() throws InterruptedException { @@ -242,20 +253,25 @@ public class TestStreamProcessor { new MapConfig(), new HashMap<>(), mock(StreamTaskFactory.class), - new StreamProcessorLifecycleListener() { + new ProcessorLifecycleListener() { + @Override + public void beforeStart() { + processorListenerState.put(ListenerCallback.BEFORE_START, true); + } + @Override - public void onStart() { - processorListenerState.put(ListenerCallback.ON_START, true); + public void afterStart() { + processorListenerState.put(ListenerCallback.AFTER_START, true); } @Override - public void onShutdown() { - processorListenerState.put(ListenerCallback.ON_SHUTDOWN, true); + public void afterStop() { + processorListenerState.put(ListenerCallback.AFTER_STOP, true); } @Override - public void onFailure(Throwable t) { - processorListenerState.put(ListenerCallback.ON_FAILURE, true); + public void afterFailure(Throwable t) { + processorListenerState.put(ListenerCallback.AFTER_FAILURE, true); actualThrowable.getAndSet(t); processorListenerFailed.countDown(); } @@ -294,27 +310,28 @@ public class TestStreamProcessor { Assert.assertTrue( "Container failed and processor listener failed was not invoked within timeout!", processorListenerFailed.await(30, TimeUnit.SECONDS)); - Assert.assertEquals(expectedThrowable, actualThrowable.get()); + assertEquals(expectedThrowable, actualThrowable.get()); - Assert.assertFalse(processorListenerState.get(ListenerCallback.ON_SHUTDOWN)); - Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_START)); - Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_FAILURE)); + Assert.assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START)); + Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_START)); + Assert.assertFalse(processorListenerState.get(ListenerCallback.AFTER_STOP)); + Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_FAILURE)); } @Test public void testStartOperationShouldBeIdempotent() { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); Mockito.doNothing().when(mockJobCoordinator).start(); - StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); StreamProcessor streamProcessor = new StreamProcessor(new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator); - Assert.assertEquals(State.NEW, streamProcessor.getState()); + assertEquals(State.NEW, streamProcessor.getState()); streamProcessor.start(); - Assert.assertEquals(State.STARTED, streamProcessor.getState()); + assertEquals(State.STARTED, streamProcessor.getState()); streamProcessor.start(); - Assert.assertEquals(State.STARTED, streamProcessor.getState()); + assertEquals(State.STARTED, streamProcessor.getState()); Mockito.verify(mockJobCoordinator, Mockito.times(1)).start(); } @@ -322,7 +339,7 @@ public class TestStreamProcessor { @Test public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); - StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); @@ -334,11 +351,11 @@ public class TestStreamProcessor { streamProcessor.start(); - Assert.assertEquals(State.STARTED, streamProcessor.getState()); + assertEquals(State.STARTED, streamProcessor.getState()); streamProcessor.jobCoordinatorListener.onJobModelExpired(); - Assert.assertEquals(State.IN_REBALANCE, streamProcessor.getState()); + assertEquals(State.IN_REBALANCE, streamProcessor.getState()); /** * When there's initialized SamzaContainer in StreamProcessor and the container shutdown @@ -357,7 +374,7 @@ public class TestStreamProcessor { streamProcessor.jobCoordinatorListener.onJobModelExpired(); - Assert.assertEquals(State.STOPPING, streamProcessor.getState()); + assertEquals(State.STOPPING, streamProcessor.getState()); Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown(); Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop(); @@ -366,13 +383,13 @@ public class TestStreamProcessor { streamProcessor.jobCoordinatorListener.onJobModelExpired(); - Assert.assertEquals(State.IN_REBALANCE, streamProcessor.state); + assertEquals(State.IN_REBALANCE, streamProcessor.state); } @Test public void testOnNewJobModelShouldResultInValidStateTransitions() throws Exception { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); - StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); @@ -389,7 +406,7 @@ public class TestStreamProcessor { @Test public void testStopShouldBeIdempotent() { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); - StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); @@ -405,13 +422,13 @@ public class TestStreamProcessor { streamProcessor.stop(); - Assert.assertEquals(State.STOPPING, streamProcessor.state); + assertEquals(State.STOPPING, streamProcessor.state); } @Test public void testCoordinatorFailureShouldStopTheStreamProcessor() { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); - StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); @@ -425,22 +442,38 @@ public class TestStreamProcessor { Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false); - Assert.assertEquals(State.STOPPED, streamProcessor.state); - Mockito.verify(lifecycleListener).onFailure(failureException); + assertEquals(State.STOPPED, streamProcessor.state); + Mockito.verify(lifecycleListener).afterFailure(failureException); Mockito.verify(mockSamzaContainer).shutdown(); } @Test public void testCoordinatorStopShouldStopTheStreamProcessor() { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); - StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); streamProcessor.state = State.RUNNING; streamProcessor.jobCoordinatorListener.onCoordinatorStop(); - Assert.assertEquals(State.STOPPED, streamProcessor.state); - Mockito.verify(lifecycleListener).onShutdown(); + assertEquals(State.STOPPED, streamProcessor.state); + Mockito.verify(lifecycleListener).afterStop(); + } + + @Test + public void testStreamProcessorWithStreamProcessorListenerFactory() { + AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new AtomicReference<>(); + StreamProcessor streamProcessor = new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class), + sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)), mock(JobCoordinator.class)); + assertEquals(streamProcessor, mockListener.get().processor); + } + + class MockStreamProcessorLifecycleListener implements ProcessorLifecycleListener { + final StreamProcessor processor; + + MockStreamProcessorLifecycleListener(StreamProcessor processor) { + this.processor = processor; + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java index 7e6433c..cfa2680 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java @@ -18,13 +18,15 @@ */ package org.apache.samza.runtime; -import org.apache.samza.application.StreamApplication; +import java.time.Duration; +import org.apache.samza.application.SamzaApplication; +import org.apache.samza.application.MockStreamApplication; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.job.ApplicationStatus; -import org.apache.samza.operators.StreamGraph; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; public class TestApplicationRunnerMain { @@ -37,8 +39,8 @@ public class TestApplicationRunnerMain { "org.apache.samza.config.factories.PropertiesConfigFactory", "--config-path", getClass().getResource("/test.properties").getPath(), - "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + "=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy", - "-config", "app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts" + "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()), + "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()), }); assertEquals(1, TestApplicationRunnerInvocationCounts.runCount); @@ -52,8 +54,8 @@ public class TestApplicationRunnerMain { "org.apache.samza.config.factories.PropertiesConfigFactory", "--config-path", getClass().getResource("/test.properties").getPath(), - "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + "=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy", - "-config", "app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts", + "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()), + "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()), "--operation=kill" }); @@ -68,50 +70,47 @@ public class TestApplicationRunnerMain { "org.apache.samza.config.factories.PropertiesConfigFactory", "--config-path", getClass().getResource("/test.properties").getPath(), - "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + "=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy", - "-config", "app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts", + "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()), + "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()), "--operation=status" }); assertEquals(1, TestApplicationRunnerInvocationCounts.statusCount); } - public static class TestApplicationRunnerInvocationCounts extends AbstractApplicationRunner { + public static class TestApplicationRunnerInvocationCounts implements ApplicationRunner { protected static int runCount = 0; protected static int killCount = 0; protected static int statusCount = 0; - public TestApplicationRunnerInvocationCounts(Config config) { - super(config); + public TestApplicationRunnerInvocationCounts(SamzaApplication userApp, Config config) { } @Override - public void runTask() { - throw new UnsupportedOperationException("runTask() not supported in this test"); - } - - @Override - public void run(StreamApplication streamApp) { + public void run() { runCount++; } @Override - public void kill(StreamApplication streamApp) { + public void kill() { killCount++; } @Override - public ApplicationStatus status(StreamApplication streamApp) { + public ApplicationStatus status() { statusCount++; return ApplicationStatus.Running; } - } - - public static class TestStreamApplicationDummy implements StreamApplication { @Override - public void init(StreamGraph graph, Config config) { + public void waitForFinish() { + waitForFinish(Duration.ofSeconds(0)); + } + @Override + public boolean waitForFinish(Duration timeout) { + return false; } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 0335913..19ee74f 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -19,235 +19,138 @@ package org.apache.samza.runtime; -import com.google.common.collect.ImmutableList; - import java.time.Duration; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.SamzaApplication; +import org.apache.samza.application.ApplicationDescriptorUtil; import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.TaskApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.TaskConfig; -import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.CoordinationUtilsFactory; -import org.apache.samza.coordinator.DistributedLockWithState; -import org.apache.samza.execution.ExecutionPlan; -import org.apache.samza.execution.StreamManager; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; -import org.apache.samza.processor.StreamProcessorLifecycleListener; -import org.apache.samza.system.StreamSpec; +import org.apache.samza.execution.LocalJobPlanner; +import org.apache.samza.task.IdentityStreamTask; +import org.apache.samza.task.StreamTaskFactory; +import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.doReturn; -@RunWith(PowerMockRunner.class) -@PrepareForTest(LocalApplicationRunner.class) public class TestLocalApplicationRunner { - private static final String PLAN_JSON = - "{" + "\"jobs\":[{" + "\"jobName\":\"test-application\"," + "\"jobId\":\"1\"," + "\"operatorGraph\":{" - + "\"intermediateStreams\":{%s}," + "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}"; - private static final String STREAM_SPEC_JSON_FORMAT = - "\"%s\":{" + "\"streamSpec\":{" + "\"id\":\"%s\"," + "\"systemName\":\"%s\"," + "\"physicalName\":\"%s\"," - + "\"partitionCount\":2}," + "\"sourceJobs\":[\"test-app\"]," + "\"targetJobs\":[\"test-target-app\"]},"; - - @Test - public void testStreamCreation() - throws Exception { - Config config = new MapConfig(new HashMap<>()); - LocalApplicationRunner runner = spy(new LocalApplicationRunner(config)); - StreamApplication app = mock(StreamApplication.class); - - StreamManager streamManager = mock(StreamManager.class); - doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class)); - - ExecutionPlan plan = mock(ExecutionPlan.class); - when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system"))); - when(plan.getPlanAsJson()).thenReturn(""); - when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config))); - doReturn(plan).when(runner).getExecutionPlan(any()); - - CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class); - JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class); - when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory); - PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig); + private Config config; + private SamzaApplication mockApp; + private LocalApplicationRunner runner; + private LocalJobPlanner localPlanner; - try { - runner.run(app); - runner.waitForFinish(); - } catch (Throwable t) { - assertNotNull(t); //no jobs exception - } - - ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class); - verify(streamManager).createStreams(captor.capture()); - List<StreamSpec> streamSpecs = captor.getValue(); - assertEquals(streamSpecs.size(), 1); - assertEquals(streamSpecs.get(0).getId(), "test-stream"); - verify(streamManager).stop(); - } - - @Test - public void testStreamCreationWithCoordination() - throws Exception { - Config config = new MapConfig(new HashMap<>()); - LocalApplicationRunner runner = spy(new LocalApplicationRunner(config)); - - StreamApplication app = mock(StreamApplication.class); - - StreamManager streamManager = mock(StreamManager.class); - doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class)); - - ExecutionPlan plan = mock(ExecutionPlan.class); - when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system"))); - when(plan.getPlanAsJson()).thenReturn(""); - when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config))); - doReturn(plan).when(runner).getExecutionPlan(any()); - - CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); - CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class); - JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class); - when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory); - PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig); - - DistributedLockWithState lock = mock(DistributedLockWithState.class); - when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true); - when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock); - when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject())) - .thenReturn(coordinationUtils); - - try { - runner.run(app); - runner.waitForFinish(); - } catch (Throwable t) { - assertNotNull(t); //no jobs exception - } - - ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class); - verify(streamManager).createStreams(captor.capture()); - - List<StreamSpec> streamSpecs = captor.getValue(); - assertEquals(streamSpecs.size(), 1); - assertEquals(streamSpecs.get(0).getId(), "test-stream"); - verify(streamManager).stop(); + @Before + public void setUp() { + config = new MapConfig(); + mockApp = mock(StreamApplication.class); + prepareTest(); } @Test public void testRunStreamTask() throws Exception { - final Map<String, String> config = new HashMap<>(); - config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); - config.put(TaskConfig.TASK_CLASS(), "org.apache.samza.task.IdentityStreamTask"); - - LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config)); + final Map<String, String> cfgs = new HashMap<>(); + cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); + cfgs.put(JobConfig.JOB_NAME(), "test-task-job"); + config = new MapConfig(cfgs); + mockApp = (TaskApplication) appDesc -> appDesc.setTaskFactory((StreamTaskFactory) () -> new IdentityStreamTask()); + prepareTest(); StreamProcessor sp = mock(StreamProcessor.class); - ArgumentCaptor<StreamProcessorLifecycleListener> captor = - ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class); + + ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor = + ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class); doAnswer(i -> { - StreamProcessorLifecycleListener listener = captor.getValue(); - listener.onStart(); - listener.onShutdown(); + ProcessorLifecycleListener listener = captor.getValue().createInstance(sp); + listener.afterStart(); + listener.afterStop(); return null; }).when(sp).start(); - LocalApplicationRunner spy = spy(runner); - doReturn(sp).when(spy).createStreamProcessor(any(Config.class), captor.capture()); + doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture()); + doReturn(ApplicationStatus.SuccessfulFinish).when(runner).status(); - spy.runTask(); + runner.run(); - assertEquals(ApplicationStatus.SuccessfulFinish, spy.status(null)); + assertEquals(ApplicationStatus.SuccessfulFinish, runner.status()); } @Test public void testRunComplete() throws Exception { - HashMap<String, String> configMap = new HashMap<>(); - configMap.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); - Config config = new MapConfig(configMap); - LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config))); - StreamApplication app = mock(StreamApplication.class); - - // buildAndStartStreamManager already includes start, so not going to verify it gets called - StreamManager streamManager = mock(StreamManager.class); - when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager); - ExecutionPlan plan = mock(ExecutionPlan.class); - when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList()); - when(plan.getPlanAsJson()).thenReturn(""); - when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))); - doReturn(plan).when(runner).getExecutionPlan(any()); + Map<String, String> cfgs = new HashMap<>(); + cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); + config = new MapConfig(cfgs); + ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> mock(ProcessorLifecycleListener.class); + mockApp = (StreamApplication) appDesc -> { + appDesc.withProcessorLifecycleListenerFactory(mockFactory); + }; + prepareTest(); + + // return the jobConfigs from the planner + doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs(); StreamProcessor sp = mock(StreamProcessor.class); - ArgumentCaptor<StreamProcessorLifecycleListener> captor = - ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class); + ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor = + ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class); doAnswer(i -> { - StreamProcessorLifecycleListener listener = captor.getValue(); - listener.onStart(); - listener.onShutdown(); + ProcessorLifecycleListener listener = captor.getValue().createInstance(sp); + listener.afterStart(); + listener.afterStop(); return null; }).when(sp).start(); doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture()); - runner.run(app); + runner.run(); runner.waitForFinish(); - assertEquals(runner.status(app), ApplicationStatus.SuccessfulFinish); - verify(streamManager).stop(); + assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish); } @Test public void testRunFailure() throws Exception { - final Map<String, String> configMap = new HashMap<>(); - configMap.put(ApplicationConfig.PROCESSOR_ID, "0"); - MapConfig config = new MapConfig(configMap); - LocalApplicationRunner runner = spy(new LocalApplicationRunner(config)); - StreamApplication app = mock(StreamApplication.class); - - // buildAndStartStreamManager already includes start, so not going to verify it gets called - StreamManager streamManager = mock(StreamManager.class); - when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager); - ExecutionPlan plan = mock(ExecutionPlan.class); - when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList()); - when(plan.getPlanAsJson()).thenReturn(""); - when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config))); - doReturn(plan).when(runner).getExecutionPlan(any()); + Map<String, String> cfgs = new HashMap<>(); + cfgs.put(ApplicationConfig.PROCESSOR_ID, "0"); + config = new MapConfig(cfgs); + ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> mock(ProcessorLifecycleListener.class); + mockApp = (StreamApplication) appDesc -> { + appDesc.withProcessorLifecycleListenerFactory(mockFactory); + }; + prepareTest(); + + // return the jobConfigs from the planner + doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs(); StreamProcessor sp = mock(StreamProcessor.class); - ArgumentCaptor<StreamProcessorLifecycleListener> captor = - ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class); + ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor = + ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class); doAnswer(i -> { @@ -257,79 +160,17 @@ public class TestLocalApplicationRunner { doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture()); try { - runner.run(app); + runner.run(); runner.waitForFinish(); } catch (Throwable th) { assertNotNull(th); } - assertEquals(runner.status(app), ApplicationStatus.UnsuccessfulFinish); - verify(streamManager).stop(); - } - - public static Set<StreamProcessor> getProcessors(LocalApplicationRunner runner) { - return runner.getProcessors(); - } - - /** - * A test case to verify if the plan results in different hash if there is change in topological sort order. - * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. - */ - @Test - public void testPlanIdWithShuffledStreamSpecs() { - List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"), - new StreamSpec("test-stream-2", "stream-2", "testStream"), - new StreamSpec("test-stream-3", "stream-3", "testStream")); - String planIdBeforeShuffle = getExecutionPlanId(streamSpecs); - - List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-2", "stream-2", "testStream"), - new StreamSpec("test-stream-1", "stream-1", "testStream"), - new StreamSpec("test-stream-3", "stream-3", "testStream")); - - - assertFalse("Expected both of the latch ids to be different", - planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs))); - } - - /** - * A test case to verify if the plan results in same hash in case of same plan. - * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. - */ - @Test - public void testGeneratePlanIdWithSameStreamSpecs() { - List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"), - new StreamSpec("test-stream-2", "stream-2", "testStream"), - new StreamSpec("test-stream-3", "stream-3", "testStream")); - String planIdForFirstAttempt = getExecutionPlanId(streamSpecs); - String planIdForSecondAttempt = getExecutionPlanId(streamSpecs); - - assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt); - assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt); - } - - /** - * A test case to verify plan results in different hash in case of different intermediate stream. - * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases. - */ - @Test - public void testGeneratePlanIdWithDifferentStreamSpecs() { - List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"), - new StreamSpec("test-stream-2", "stream-2", "testStream"), - new StreamSpec("test-stream-3", "stream-3", "testStream")); - String planIdBeforeShuffle = getExecutionPlanId(streamSpecs); - - List<StreamSpec> updatedStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"), - new StreamSpec("test-stream-4", "stream-4", "testStream"), - new StreamSpec("test-stream-3", "stream-3", "testStream")); - - - assertFalse("Expected both of the latch ids to be different", - planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs))); + assertEquals(runner.status(), ApplicationStatus.UnsuccessfulFinish); } @Test public void testWaitForFinishReturnsBeforeTimeout() { - LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig()); long timeoutInMs = 1000; runner.getShutdownLatch().countDown(); @@ -339,23 +180,15 @@ public class TestLocalApplicationRunner { @Test public void testWaitForFinishTimesout() { - LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig()); long timeoutInMs = 100; boolean finished = runner.waitForFinish(Duration.ofMillis(timeoutInMs)); assertFalse("Application finished before the timeout.", finished); } - private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) { - String intermediateStreamJson = - updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(",")); - - int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode(); - - return String.valueOf(planId); + private void prepareTest() { + ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = ApplicationDescriptorUtil.getAppDescriptor(mockApp, config); + localPlanner = spy(new LocalJobPlanner(appDesc)); + runner = spy(new LocalApplicationRunner(appDesc, localPlanner)); } - private String streamSpecToJson(StreamSpec streamSpec) { - return String.format(STREAM_SPEC_JSON_FORMAT, streamSpec.getId(), streamSpec.getId(), streamSpec.getSystemName(), - streamSpec.getPhysicalName()); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java index 2734d56..ae525fb 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java @@ -22,6 +22,7 @@ package org.apache.samza.runtime; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -29,30 +30,45 @@ import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.StreamJob; import org.apache.samza.job.StreamJobFactory; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; /** * A test class for {@link RemoteApplicationRunner}. */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(RemoteApplicationRunner.class) public class TestRemoteApplicationRunner { + + private RemoteApplicationRunner runner; + + @Before + public void setUp() { + Map<String, String> config = new HashMap<>(); + StreamApplication userApp = appDesc -> { }; + runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(config))); + } + @Test public void testWaitForFinishReturnsBeforeTimeout() { - RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new MapConfig())); doReturn(ApplicationStatus.SuccessfulFinish).when(runner).getApplicationStatus(any(JobConfig.class)); - boolean finished = runner.waitForFinish(Duration.ofMillis(5000)); assertTrue("Application did not finish before the timeout.", finished); } @Test public void testWaitForFinishTimesout() { - RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new MapConfig())); doReturn(ApplicationStatus.Running).when(runner).getApplicationStatus(any(JobConfig.class)); - boolean finished = runner.waitForFinish(Duration.ofMillis(1000)); assertFalse("Application finished before the timeout.", finished); } @@ -64,11 +80,14 @@ public class TestRemoteApplicationRunner { m.put(JobConfig.STREAM_JOB_FACTORY_CLASS(), MockStreamJobFactory.class.getName()); m.put(JobConfig.JOB_ID(), "newJob"); - RemoteApplicationRunner runner = new RemoteApplicationRunner(new MapConfig()); + + StreamApplication userApp = appDesc -> { }; + runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(m))); + Assert.assertEquals(ApplicationStatus.New, runner.getApplicationStatus(new JobConfig(new MapConfig(m)))); m.put(JobConfig.JOB_ID(), "runningJob"); - runner = new RemoteApplicationRunner(new JobConfig(new MapConfig(m))); + runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(m))); Assert.assertEquals(ApplicationStatus.Running, runner.getApplicationStatus(new JobConfig(new MapConfig(m)))); } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/task/MockAsyncStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/MockAsyncStreamTask.java b/samza-core/src/test/java/org/apache/samza/task/MockAsyncStreamTask.java new file mode 100644 index 0000000..e449ecc --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/task/MockAsyncStreamTask.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.system.IncomingMessageEnvelope; + +/** + * Test implementation class for {@link AsyncStreamTask} + */ +public class MockAsyncStreamTask implements AsyncStreamTask { + @Override + public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/task/MockStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/MockStreamTask.java b/samza-core/src/test/java/org/apache/samza/task/MockStreamTask.java new file mode 100644 index 0000000..d089c4b --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/task/MockStreamTask.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.system.IncomingMessageEnvelope; + +/** + * Test implementation class for {@link StreamTask} + */ +public class MockStreamTask implements StreamTask { + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java index 0b91315..f96ab19 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java @@ -21,24 +21,20 @@ package org.apache.samza.task; import java.lang.reflect.Field; import java.util.concurrent.ExecutorService; import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.ApplicationConfig; -import org.apache.samza.config.Config; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.application.TaskApplicationDescriptorImpl; import org.apache.samza.config.ConfigException; -import org.apache.samza.config.MapConfig; -import org.apache.samza.operators.StreamGraphSpec; -import org.apache.samza.testUtils.TestAsyncStreamTask; -import org.apache.samza.testUtils.TestStreamTask; +import org.apache.samza.operators.OperatorSpecGraph; import org.junit.Test; -import java.util.HashMap; - import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Test methods to create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on task class configuration @@ -47,22 +43,12 @@ public class TestTaskFactoryUtil { @Test public void testStreamTaskClass() { - Config config = new MapConfig(new HashMap<String, String>() { - { - this.put("task.class", "org.apache.samza.testUtils.TestStreamTask"); - } - }); - Object retFactory = TaskFactoryUtil.createTaskFactory(config); + TaskFactory retFactory = TaskFactoryUtil.getTaskFactory(MockStreamTask.class.getName()); assertTrue(retFactory instanceof StreamTaskFactory); - assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask); + assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof MockStreamTask); - config = new MapConfig(new HashMap<String, String>() { - { - this.put("task.class", "no.such.class"); - } - }); try { - TaskFactoryUtil.createTaskFactory(config); + TaskFactoryUtil.getTaskFactory("no.such.class"); fail("Should have failed w/ no.such.class"); } catch (ConfigException cfe) { // expected @@ -70,156 +56,13 @@ public class TestTaskFactoryUtil { } @Test - public void testCreateStreamApplication() throws Exception { - Config config = new MapConfig(new HashMap<String, String>() { - { - this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); - } - }); - StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); - assertNotNull(streamApp); - StreamGraphSpec graph = new StreamGraphSpec(config); - streamApp.init(graph, config); - Object retFactory = TaskFactoryUtil.createTaskFactory(graph.getOperatorSpecGraph(), null); - assertTrue(retFactory instanceof StreamTaskFactory); - assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask); - - config = new MapConfig(new HashMap<String, String>() { - { - this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.InvalidStreamApplication"); - } - }); - try { - TaskFactoryUtil.createStreamApplication(config); - fail("Should have failed w/ no.such.class"); - } catch (ConfigException ce) { - // expected - } - - config = new MapConfig(new HashMap<String, String>() { - { - this.put(ApplicationConfig.APP_CLASS, "no.such.class"); - } - }); - try { - TaskFactoryUtil.createStreamApplication(config); - fail("Should have failed w/ no.such.class"); - } catch (ConfigException ce) { - // expected - } - - config = new MapConfig(new HashMap<String, String>() { - { - this.put(ApplicationConfig.APP_CLASS, ""); - } - }); - streamApp = TaskFactoryUtil.createStreamApplication(config); - assertNull(streamApp); - - config = new MapConfig(new HashMap<>()); - streamApp = TaskFactoryUtil.createStreamApplication(config); - assertNull(streamApp); - } - - @Test - public void testCreateStreamApplicationWithTaskClass() throws Exception { - Config config = new MapConfig(new HashMap<String, String>() { - { - this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); - } - }); - StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); - assertNotNull(streamApp); - - config = new MapConfig(new HashMap<String, String>() { - { - this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); - this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); - } - }); - try { - TaskFactoryUtil.createStreamApplication(config); - fail("should have failed with invalid config"); - } catch (ConfigException ce) { - // expected - } - - config = new MapConfig(new HashMap<String, String>() { - { - this.put("task.class", "no.such.class"); - this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); - } - }); - try { - TaskFactoryUtil.createStreamApplication(config); - fail("should have failed with invalid config"); - } catch (ConfigException ce) { - // expected - } - - - config = new MapConfig(new HashMap<String, String>() { - { - this.put("task.class", ""); - this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.TestStreamApplication"); - } - }); - streamApp = TaskFactoryUtil.createStreamApplication(config); - assertNotNull(streamApp); - - } - - @Test - public void testStreamTaskClassWithInvalidStreamApplication() throws Exception { - - Config config = new MapConfig(new HashMap<String, String>() { - { - this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.InvalidStreamApplication"); - } - }); - try { - TaskFactoryUtil.createStreamApplication(config); - fail("Should have failed w/ no.such.class"); - } catch (ConfigException ce) { - // expected - } - - } - - @Test public void testAsyncStreamTask() { - Config config = new MapConfig(new HashMap<String, String>() { - { - this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask"); - } - }); - Object retFactory = TaskFactoryUtil.createTaskFactory(config); + TaskFactory retFactory = TaskFactoryUtil.getTaskFactory(MockAsyncStreamTask.class.getName()); assertTrue(retFactory instanceof AsyncStreamTaskFactory); - assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask); + assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof MockAsyncStreamTask); - config = new MapConfig(new HashMap<String, String>() { - { - this.put("task.class", "no.such.class"); - } - }); try { - TaskFactoryUtil.createTaskFactory(config); - fail("Should have failed w/ no.such.class"); - } catch (ConfigException cfe) { - // expected - } - } - - @Test - public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws Exception { - - Config config = new MapConfig(new HashMap<String, String>() { - { - this.put(ApplicationConfig.APP_CLASS, "org.apache.samza.testUtils.InvalidStreamApplication"); - } - }); - try { - TaskFactoryUtil.createStreamApplication(config); + TaskFactoryUtil.getTaskFactory("no.such.class"); fail("Should have failed w/ no.such.class"); } catch (ConfigException cfe) { // expected @@ -228,7 +71,7 @@ public class TestTaskFactoryUtil { @Test public void testFinalizeTaskFactory() throws NoSuchFieldException, IllegalAccessException { - Object mockFactory = mock(Object.class); + TaskFactory mockFactory = mock(TaskFactory.class); try { TaskFactoryUtil.finalizeTaskFactory(mockFactory, true, null); fail("Should have failed with validation"); @@ -260,4 +103,34 @@ public class TestTaskFactoryUtil { retFactory = TaskFactoryUtil.finalizeTaskFactory(mockAsyncStreamFactory, false, null); assertEquals(retFactory, mockAsyncStreamFactory); } + + // test getTaskFactory with StreamApplicationDescriptor + @Test + public void testGetTaskFactoryWithStreamAppDescriptor() { + StreamApplicationDescriptorImpl mockStreamApp = mock(StreamApplicationDescriptorImpl.class); + OperatorSpecGraph mockSpecGraph = mock(OperatorSpecGraph.class); + when(mockStreamApp.getOperatorSpecGraph()).thenReturn(mockSpecGraph); + TaskFactory streamTaskFactory = TaskFactoryUtil.getTaskFactory(mockStreamApp); + assertTrue(streamTaskFactory instanceof StreamTaskFactory); + StreamTask streamTask = ((StreamTaskFactory) streamTaskFactory).createInstance(); + assertTrue(streamTask instanceof StreamOperatorTask); + verify(mockSpecGraph).clone(); + } + + // test getTaskFactory with TaskApplicationDescriptor + @Test + public void testGetTaskFactoryWithTaskAppDescriptor() { + TaskApplicationDescriptorImpl mockTaskApp = mock(TaskApplicationDescriptorImpl.class); + TaskFactory mockTaskFactory = mock(TaskFactory.class); + when(mockTaskApp.getTaskFactory()).thenReturn(mockTaskFactory); + TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(mockTaskApp); + assertEquals(mockTaskFactory, taskFactory); + } + + // test getTaskFactory with invalid ApplicationDescriptorImpl + @Test(expected = IllegalArgumentException.class) + public void testGetTaskFactoryWithInvalidAddDescriptorImpl() { + ApplicationDescriptorImpl mockInvalidApp = mock(ApplicationDescriptorImpl.class); + TaskFactoryUtil.getTaskFactory(mockInvalidApp); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java deleted file mode 100644 index 81f3fd4..0000000 --- a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.testUtils; - -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.task.AsyncStreamTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCallback; -import org.apache.samza.task.TaskCoordinator; - -/** - * Test implementation class for {@link AsyncStreamTask} - */ -public class TestAsyncStreamTask implements AsyncStreamTask { - @Override - public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) { - - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamApplication.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamApplication.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamApplication.java deleted file mode 100644 index a1cba7d..0000000 --- a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamApplication.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.testUtils; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.application.StreamApplication; - -/** - * Test implementation class for {@link StreamApplication} - */ -public class TestStreamApplication implements StreamApplication { - @Override - public void init(StreamGraph graph, Config config) { - - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java deleted file mode 100644 index ce0980a..0000000 --- a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.testUtils; - -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskCoordinator; - -/** - * Test implementation class for {@link StreamTask} - */ -public class TestStreamTask implements StreamTask { - @Override - public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { - - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 9aca45e..ff57047 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -187,6 +187,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @volatile var onContainerStopCalled = false @volatile var onContainerStartCalled = false @volatile var onContainerFailedThrowable: Throwable = null + @volatile var onContainerBeforeStartCalled = false val container = new SamzaContainer( containerContext = containerContext, @@ -198,23 +199,29 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { - override def onContainerFailed(t: Throwable): Unit = { + override def afterFailure(t: Throwable): Unit = { onContainerFailedCalled = true onContainerFailedThrowable = t } - override def onContainerStop(): Unit = { + override def afterStop(): Unit = { onContainerStopCalled = true } - override def onContainerStart(): Unit = { + override def afterStart(): Unit = { onContainerStartCalled = true } + + override def beforeStart(): Unit = { + onContainerBeforeStartCalled = true + } + } container.setContainerListener(containerListener) container.run assertTrue(task.wasShutdown) + assertTrue(onContainerBeforeStartCalled) assertFalse(onContainerStartCalled) assertFalse(onContainerStopCalled) @@ -266,6 +273,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @volatile var onContainerStopCalled = false @volatile var onContainerStartCalled = false @volatile var onContainerFailedThrowable: Throwable = null + @volatile var onContainerBeforeStartCalled = false val mockRunLoop = mock[RunLoop] when(mockRunLoop.run).thenThrow(new RuntimeException("Trigger a shutdown, please.")) @@ -279,23 +287,31 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { - override def onContainerFailed(t: Throwable): Unit = { + override def afterFailure(t: Throwable): Unit = { onContainerFailedCalled = true onContainerFailedThrowable = t } - override def onContainerStop(): Unit = { + override def afterStop(): Unit = { onContainerStopCalled = true } - override def onContainerStart(): Unit = { + override def afterStart(): Unit = { onContainerStartCalled = true } + + /** + * Method invoked before the {@link org.apache.samza.container.SamzaContainer} is started + */ + override def beforeStart(): Unit = { + onContainerBeforeStartCalled = true + } } container.setContainerListener(containerListener) container.run assertTrue(task.wasShutdown) + assertTrue(onContainerBeforeStartCalled) assertTrue(onContainerStartCalled) assertFalse(onContainerStopCalled) @@ -352,6 +368,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @volatile var onContainerStopCalled = false @volatile var onContainerStartCalled = false @volatile var onContainerFailedThrowable: Throwable = null + @volatile var onContainerBeforeStartCalled = false val container = new SamzaContainer( containerContext = containerContext, @@ -362,25 +379,32 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { - override def onContainerFailed(t: Throwable): Unit = { + override def afterFailure(t: Throwable): Unit = { onContainerFailedCalled = true onContainerFailedThrowable = t } - override def onContainerStop(): Unit = { + override def afterStop(): Unit = { onContainerStopCalled = true } - override def onContainerStart(): Unit = { + override def afterStart(): Unit = { onContainerStartCalled = true } + + /** + * Method invoked before the {@link org.apache.samza.container.SamzaContainer} is started + */ + override def beforeStart(): Unit = { + onContainerBeforeStartCalled = true + } } container.setContainerListener(containerListener) container.run assertTrue(task.wasShutdown) - + assertTrue(onContainerBeforeStartCalled) assertFalse(onContainerStopCalled) assertFalse(onContainerStartCalled) @@ -429,6 +453,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @volatile var onContainerStopCalled = false @volatile var onContainerStartCalled = false @volatile var onContainerFailedThrowable: Throwable = null + @volatile var onContainerBeforeStartCalled = false val mockRunLoop = mock[RunLoop] when(mockRunLoop.run).thenAnswer(new Answer[Unit] { @@ -446,22 +471,30 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { - override def onContainerFailed(t: Throwable): Unit = { + override def afterFailure(t: Throwable): Unit = { onContainerFailedCalled = true onContainerFailedThrowable = t } - override def onContainerStop(): Unit = { + override def afterStop(): Unit = { onContainerStopCalled = true } - override def onContainerStart(): Unit = { + override def afterStart(): Unit = { onContainerStartCalled = true } + + /** + * Method invoked before the {@link org.apache.samza.container.SamzaContainer} is started + */ + override def beforeStart(): Unit = { + onContainerBeforeStartCalled = true + } } container.setContainerListener(containerListener) container.run + assertTrue(onContainerBeforeStartCalled) assertFalse(onContainerFailedCalled) assertTrue(onContainerStartCalled) assertTrue(onContainerStopCalled) @@ -507,6 +540,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @volatile var onContainerStopCalled = false @volatile var onContainerStartCalled = false @volatile var onContainerFailedThrowable: Throwable = null + @volatile var onContainerBeforeStartCalled = false val mockRunLoop = mock[RunLoop] when(mockRunLoop.run).thenAnswer(new Answer[Unit] { @@ -525,24 +559,34 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { metrics = new SamzaContainerMetrics) val containerListener = new SamzaContainerListener { - override def onContainerFailed(t: Throwable): Unit = { + override def afterFailure(t: Throwable): Unit = { onContainerFailedCalled = true onContainerFailedThrowable = t } - override def onContainerStop(): Unit = { + override def afterStop(): Unit = { onContainerStopCalled = true } - override def onContainerStart(): Unit = { + override def afterStart(): Unit = { onContainerStartCalled = true } + + /** + * Method invoked before the {@link org.apache.samza.container.SamzaContainer} is started + */ + override def beforeStart(): Unit = { + onContainerBeforeStartCalled = true } + } container.setContainerListener(containerListener) container.run + assertTrue(onContainerBeforeStartCalled) + assertTrue(onContainerStartCalled) assertTrue(onContainerFailedCalled) + assertFalse(onContainerStopCalled) } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java index 181971a..9a871d7 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java @@ -20,13 +20,10 @@ package org.apache.samza.sql.runner; import java.util.List; - -import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.sql.translator.QueryTranslator; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; +import org.apache.samza.sql.translator.QueryTranslator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,14 +36,14 @@ public class SamzaSqlApplication implements StreamApplication { private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlApplication.class); @Override - public void init(StreamGraph streamGraph, Config config) { + public void describe(StreamApplicationDescriptor appDesc) { try { - SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config); + SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(appDesc.getConfig()); QueryTranslator queryTranslator = new QueryTranslator(sqlConfig); List<SamzaSqlQueryParser.QueryInfo> queries = sqlConfig.getQueryInfo(); for (SamzaSqlQueryParser.QueryInfo query : queries) { LOG.info("Translating the query {} to samza stream graph", query.getSelectQuery()); - queryTranslator.translate(query, streamGraph); + queryTranslator.translate(query, appDesc); } } catch (RuntimeException e) { LOG.error("SamzaSqlApplication threw exception.", e); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java index 044c7cf..027fd23 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java @@ -19,20 +19,21 @@ package org.apache.samza.sql.runner; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang3.Validate; -import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.job.ApplicationStatus; -import org.apache.samza.runtime.AbstractApplicationRunner; +import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunners; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.runtime.RemoteApplicationRunner; -import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.interfaces.SqlIOConfig; +import org.apache.samza.sql.interfaces.SqlIOResolver; import org.apache.samza.sql.testutil.SamzaSqlQueryParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,26 +46,18 @@ import org.slf4j.LoggerFactory; * This runner invokes the SamzaSqlConfig re-writer if it is invoked on a standalone mode (i.e. localRunner == true) * otherwise directly calls the RemoteApplicationRunner which automatically performs the config rewriting . */ -public class SamzaSqlApplicationRunner extends AbstractApplicationRunner { +public class SamzaSqlApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(SamzaSqlApplicationRunner.class); - private final Config sqlConfig; - private final ApplicationRunner appRunner; - private final Boolean localRunner; + private final ApplicationRunner runner; public static final String RUNNER_CONFIG = "app.runner.class"; public static final String CFG_FMT_SAMZA_STREAM_SYSTEM = "streams.%s.samza.system"; - public SamzaSqlApplicationRunner(Config config) { - this(false, config); - } - public SamzaSqlApplicationRunner(Boolean localRunner, Config config) { - super(config); - this.localRunner = localRunner; - sqlConfig = computeSamzaConfigs(localRunner, config); - appRunner = ApplicationRunner.fromConfig(sqlConfig); + this.runner = ApplicationRunners.getApplicationRunner(new SamzaSqlApplication(), + computeSamzaConfigs(localRunner, config)); } public static Config computeSamzaConfigs(Boolean localRunner, Config config) { @@ -107,30 +100,34 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner { } public void runAndWaitForFinish() { - Validate.isTrue(localRunner, "This method can be called only in standalone mode."); - SamzaSqlApplication app = new SamzaSqlApplication(); - run(app); - appRunner.waitForFinish(); + Validate.isTrue(runner instanceof LocalApplicationRunner, "This method can be called only in standalone mode."); + run(); + waitForFinish(); } @Override - public void runTask() { - appRunner.runTask(); + public void run() { + runner.run(); } @Override - public void run(StreamApplication streamApp) { - Validate.isInstanceOf(SamzaSqlApplication.class, streamApp); - appRunner.run(streamApp); + public void kill() { + runner.kill(); } @Override - public void kill(StreamApplication streamApp) { - appRunner.kill(streamApp); + public ApplicationStatus status() { + return runner.status(); } @Override - public ApplicationStatus status(StreamApplication streamApp) { - return appRunner.status(streamApp); + public void waitForFinish() { + runner.waitForFinish(); } + + @Override + public boolean waitForFinish(Duration timeout) { + return runner.waitForFinish(timeout); + } + }