http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index dac4e94..602b595 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -21,9 +21,8 @@ package org.apache.samza.operators; import com.google.common.collect.ImmutableSet; import org.apache.samza.Partition; import org.apache.samza.SamzaException; -import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.functions.JoinFunction; @@ -44,18 +43,24 @@ import org.apache.samza.task.TaskCoordinator; import org.apache.samza.testUtils.TestClock; import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; +import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestJoinOperator { @@ -64,10 +69,22 @@ public class TestJoinOperator { private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + private Config config; + + @Before + public void setUp() { + Map<String, String> mapConfig = new HashMap<>(); + mapConfig.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner"); + mapConfig.put("job.default.system", "insystem"); + mapConfig.put("job.name", "jobName"); + mapConfig.put("job.id", "jobId"); + config = new MapConfig(mapConfig); + } + @Test public void join() throws Exception { - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -82,43 +99,42 @@ public class TestJoinOperator { @Test(expected = SamzaException.class) public void joinWithSelfThrowsException() throws Exception { - StreamApplication app = new StreamApplication() { - @Override - public void init(StreamGraph graph, Config config) { - IntegerSerde integerSerde = new IntegerSerde(); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); - MessageStream<KV<Integer, Integer>> inStream = graph.getInputStream("instream", kvSerde); - - inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join"); - } - }; - - createStreamOperatorTask(new SystemClock(), app); // should throw an exception + config.put("streams.instream.system", "insystem"); + + StreamGraphSpec graphSpec = new StreamGraphSpec(mock(ApplicationRunner.class), config); + IntegerSerde integerSerde = new IntegerSerde(); + KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); + MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream("instream", kvSerde); + + inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join"); + + createStreamOperatorTask(new SystemClock(), graphSpec); // should throw an exception } @Test public void joinFnInitAndClose() throws Exception { TestJoinFunction joinFn = new TestJoinFunction(); - TestJoinStreamApplication app = new TestJoinStreamApplication(joinFn); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); - assertEquals(1, joinFn.getNumInitCalls()); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(joinFn); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + MessageCollector messageCollector = mock(MessageCollector.class); // push messages to first stream numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator)); // close should not be called till now - assertEquals(0, joinFn.getNumCloseCalls()); sot.close(); - // close should be called from sot.close() - assertEquals(1, joinFn.getNumCloseCalls()); + verify(messageCollector, times(0)).send(any(OutgoingMessageEnvelope.class)); + // Make sure the joinFn has been copied instead of directly referred by the task instance + assertEquals(0, joinFn.getNumInitCalls()); + assertEquals(0, joinFn.getNumCloseCalls()); } @Test public void joinReverse() throws Exception { - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -133,8 +149,8 @@ public class TestJoinOperator { @Test public void joinNoMatch() throws Exception { - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -148,8 +164,8 @@ public class TestJoinOperator { @Test public void joinNoMatchReverse() throws Exception { - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -163,8 +179,8 @@ public class TestJoinOperator { @Test public void joinRetainsLatestMessageForKey() throws Exception { - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -181,8 +197,8 @@ public class TestJoinOperator { @Test public void joinRetainsLatestMessageForKeyReverse() throws Exception { - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -199,8 +215,8 @@ public class TestJoinOperator { @Test public void joinRetainsMatchedMessages() throws Exception { - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -222,8 +238,8 @@ public class TestJoinOperator { @Test public void joinRetainsMatchedMessagesReverse() throws Exception { - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -246,8 +262,8 @@ public class TestJoinOperator { @Test public void joinRemovesExpiredMessages() throws Exception { TestClock testClock = new TestClock(); - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(testClock, app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -266,8 +282,8 @@ public class TestJoinOperator { @Test public void joinRemovesExpiredMessagesReverse() throws Exception { TestClock testClock = new TestClock(); - TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(testClock, app); + StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -283,15 +299,12 @@ public class TestJoinOperator { assertTrue(output.isEmpty()); } - private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamApplication app) throws Exception { - ApplicationRunner runner = mock(ApplicationRunner.class); - when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem")); - when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2")); + private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamGraphSpec graphSpec) throws Exception { TaskContextImpl taskContext = mock(TaskContextImpl.class); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), - new SystemStreamPartition("insystem2", "instream2", new Partition(0)))); + new SystemStreamPartition("insystem", "instream2", new Partition(0)))); when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); // need to return different stores for left and right side IntegerSerde integerSerde = new IntegerSerde(); @@ -301,35 +314,30 @@ public class TestJoinOperator { when(taskContext.getStore(eq("jobName-jobId-join-j1-R"))) .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde)); - Config config = mock(Config.class); - when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName"); - when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); - - StreamOperatorTask sot = new StreamOperatorTask(app, runner, clock); + StreamOperatorTask sot = new StreamOperatorTask(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager(), clock); sot.init(config, taskContext); return sot; } - private static class TestJoinStreamApplication implements StreamApplication { - - private final TestJoinFunction joinFn; - - TestJoinStreamApplication(TestJoinFunction joinFn) { - this.joinFn = joinFn; - } + private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException { + ApplicationRunner runner = mock(ApplicationRunner.class); + when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem")); + when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem")); - @Override - public void init(StreamGraph graph, Config config) { - IntegerSerde integerSerde = new IntegerSerde(); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); - MessageStream<KV<Integer, Integer>> inStream = graph.getInputStream("instream", kvSerde); - MessageStream<KV<Integer, Integer>> inStream2 = graph.getInputStream("instream2", kvSerde); - - SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - inStream - .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1") - .sink((m, mc, tc) -> mc.send(new OutgoingMessageEnvelope(outputSystemStream, m))); - } + StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config); + IntegerSerde integerSerde = new IntegerSerde(); + KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); + MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream("instream", kvSerde); + MessageStream<KV<Integer, Integer>> inStream2 = graphSpec.getInputStream("instream2", kvSerde); + + inStream + .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1") + .sink((message, messageCollector, taskCoordinator) -> { + SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + + return graphSpec; } private static class TestJoinFunction @@ -380,7 +388,7 @@ public class TestJoinOperator { private static class SecondStreamIME extends IncomingMessageEnvelope { SecondStreamIME(Integer key, Integer value) { - super(new SystemStreamPartition("insystem2", "instream2", new Partition(0)), "1", key, value); + super(new SystemStreamPartition("insystem", "instream2", new Partition(0)), "1", key, value); } } }
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index 96e234e..fff85e8 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -18,11 +18,11 @@ */ package org.apache.samza.operators; +import com.google.common.collect.ImmutableList; +import java.io.IOException; import java.time.Duration; import java.util.Collection; import java.util.Collections; -import java.util.function.Function; -import java.util.function.Supplier; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; @@ -32,6 +32,7 @@ import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; @@ -54,8 +55,6 @@ import org.apache.samza.table.TableSpec; import org.junit.Test; import org.mockito.ArgumentCaptor; -import com.google.common.collect.ImmutableList; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -71,7 +70,7 @@ public class TestMessageStreamImpl { @Test public void testMap() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -96,7 +95,7 @@ public class TestMessageStreamImpl { @Test public void testFlatMap() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -113,7 +112,7 @@ public class TestMessageStreamImpl { @Test public void testFlatMapWithRelaxedTypes() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -133,7 +132,7 @@ public class TestMessageStreamImpl { @Test public void testFilter() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -158,7 +157,7 @@ public class TestMessageStreamImpl { @Test public void testSink() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -175,7 +174,7 @@ public class TestMessageStreamImpl { @Test public void testSendTo() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class); @@ -200,8 +199,8 @@ public class TestMessageStreamImpl { } @Test - public void testRepartition() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + public void testPartitionBy() throws IOException { + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); String mockOpName = "mockName"; when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName); @@ -215,8 +214,8 @@ public class TestMessageStreamImpl { when(mockIntermediateStream.isKeyed()).thenReturn(true); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); - Function mockKeyFunction = mock(Function.class); - Function mockValueFunction = mock(Function.class); + MapFunction mockKeyFunction = mock(MapFunction.class); + MapFunction mockValueFunction = mock(MapFunction.class); inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde, "p1"); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); @@ -232,7 +231,7 @@ public class TestMessageStreamImpl { @Test public void testRepartitionWithoutSerde() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); String mockOpName = "mockName"; when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName); @@ -245,8 +244,8 @@ public class TestMessageStreamImpl { when(mockIntermediateStream.isKeyed()).thenReturn(true); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); - Function mockKeyFunction = mock(Function.class); - Function mockValueFunction = mock(Function.class); + MapFunction mockKeyFunction = mock(MapFunction.class); + MapFunction mockValueFunction = mock(MapFunction.class); inputStream.partitionBy(mockKeyFunction, mockValueFunction, "p1"); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); @@ -262,18 +261,17 @@ public class TestMessageStreamImpl { @Test public void testWindowWithRelaxedTypes() throws Exception { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); - Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey(); + MapFunction<TestMessageEnvelope, String> keyExtractor = m -> m.getKey(); FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1; - Supplier<Integer> initialValue = () -> 0; + SupplierFunction<Integer> initialValue = () -> 0; // should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M) - Window<TestInputMessageEnvelope, String, Integer> window = - Windows.keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator, - null, mock(Serde.class)); + Window<TestInputMessageEnvelope, String, Integer> window = Windows + .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator, null, mock(Serde.class)); MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window, "w1"); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); @@ -287,7 +285,7 @@ public class TestMessageStreamImpl { @Test public void testJoin() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec leftInputOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec); OperatorSpec rightInputOpSpec = mock(OperatorSpec.class); @@ -319,7 +317,7 @@ public class TestMessageStreamImpl { @Test public void testSendToTable() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec inputOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> source = new MessageStreamImpl<>(mockGraph, inputOpSpec); @@ -336,13 +334,12 @@ public class TestMessageStreamImpl { SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) registeredOpSpec; assertEquals(OpCode.SEND_TO, sendToTableOperatorSpec.getOpCode()); - assertEquals(inputOpSpec, sendToTableOperatorSpec.getInputOpSpec()); assertEquals(tableSpec, sendToTableOperatorSpec.getTableSpec()); } @Test public void testStreamTableJoin() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec leftInputOpSpec = mock(OperatorSpec.class); MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec); OperatorSpec rightInputOpSpec = mock(OperatorSpec.class); @@ -370,7 +367,7 @@ public class TestMessageStreamImpl { @Test public void testMerge() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); OperatorSpec mockOpSpec1 = mock(OperatorSpec.class); MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec1); @@ -410,7 +407,7 @@ public class TestMessageStreamImpl { @Test public void testMergeWithRelaxedTypes() { - StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class)); // other streams have the same message type T as input stream message type M http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java new file mode 100644 index 0000000..2be88ca --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for THE + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import java.io.IOException; +import java.io.NotSerializableException; +import java.io.ObjectInputStream; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import org.apache.samza.SamzaException; +import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.OperatorSpecTestUtils; +import org.apache.samza.operators.spec.OperatorSpecs; +import org.apache.samza.operators.spec.OutputOperatorSpec; +import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.system.StreamSpec; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +/** + * Unit tests for {@link OperatorSpecGraph} + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(OperatorSpec.class) +public class TestOperatorSpecGraph { + + private StreamGraphSpec mockGraph; + private Map<StreamSpec, InputOperatorSpec> inputOpSpecMap; + private Map<StreamSpec, OutputStreamImpl> outputStrmMap; + private Set<OperatorSpec> allOpSpecs; + + @Before + public void setUp() { + this.mockGraph = mock(StreamGraphSpec.class); + + /** + * Setup two linear transformation pipelines: + * 1) input1 --> filter --> sendTo + * 2) input2 --> map --> sink + */ + StreamSpec testInputSpec = new StreamSpec("test-input-1", "test-input-1", "kafka"); + InputOperatorSpec testInput = new InputOperatorSpec(testInputSpec, new NoOpSerde(), new NoOpSerde(), true, "test-input-1"); + StreamOperatorSpec filterOp = OperatorSpecs.createFilterOperatorSpec(m -> true, "test-filter-2"); + StreamSpec testOutputSpec = new StreamSpec("test-output-1", "test-output-1", "kafka"); + OutputStreamImpl outputStream1 = new OutputStreamImpl(testOutputSpec, null, null, true); + OutputOperatorSpec outputSpec = OperatorSpecs.createSendToOperatorSpec(outputStream1, "test-output-3"); + testInput.registerNextOperatorSpec(filterOp); + filterOp.registerNextOperatorSpec(outputSpec); + StreamSpec testInputSpec2 = new StreamSpec("test-input-2", "test-input-2", "kafka"); + InputOperatorSpec testInput2 = new InputOperatorSpec(testInputSpec2, new NoOpSerde(), new NoOpSerde(), true, "test-input-4"); + StreamOperatorSpec testMap = OperatorSpecs.createMapOperatorSpec(m -> m, "test-map-5"); + SinkOperatorSpec testSink = OperatorSpecs.createSinkOperatorSpec((m, mc, tc) -> { }, "test-sink-6"); + testInput2.registerNextOperatorSpec(testMap); + testMap.registerNextOperatorSpec(testSink); + + this.inputOpSpecMap = new LinkedHashMap<>(); + inputOpSpecMap.put(testInputSpec, testInput); + inputOpSpecMap.put(testInputSpec2, testInput2); + this.outputStrmMap = new LinkedHashMap<>(); + outputStrmMap.put(testOutputSpec, outputStream1); + when(mockGraph.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap)); + when(mockGraph.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap)); + this.allOpSpecs = new HashSet<OperatorSpec>() { { + this.add(testInput); + this.add(filterOp); + this.add(outputSpec); + this.add(testInput2); + this.add(testMap); + this.add(testSink); + } }; + } + + @After + public void tearDown() { + this.mockGraph = null; + this.inputOpSpecMap = null; + this.outputStrmMap = null; + this.allOpSpecs = null; + } + + @Test + public void testConstructor() { + OperatorSpecGraph specGraph = new OperatorSpecGraph(mockGraph); + assertEquals(specGraph.getInputOperators(), inputOpSpecMap); + assertEquals(specGraph.getOutputStreams(), outputStrmMap); + assertTrue(specGraph.getTables().isEmpty()); + assertTrue(!specGraph.hasWindowOrJoins()); + assertEquals(specGraph.getAllOperatorSpecs(), this.allOpSpecs); + } + + @Test + public void testClone() { + OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph); + OperatorSpecGraph clonedSpecGraph = operatorSpecGraph.clone(); + OperatorSpecTestUtils.assertClonedGraph(operatorSpecGraph, clonedSpecGraph); + } + + @Test(expected = NotSerializableException.class) + public void testCloneWithSerializationError() throws Throwable { + OperatorSpec mockFailedOpSpec = PowerMockito.mock(OperatorSpec.class); + when(mockFailedOpSpec.getOpId()).thenReturn("test-failed-op-4"); + allOpSpecs.add(mockFailedOpSpec); + inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(mockFailedOpSpec); + + //failed with serialization error + try { + new OperatorSpecGraph(mockGraph); + fail("Should have failed with serialization error"); + } catch (SamzaException se) { + throw se.getCause(); + } + } + + @Test(expected = IOException.class) + public void testCloneWithDeserializationError() throws Throwable { + TestDeserializeOperatorSpec testOp = new TestDeserializeOperatorSpec(OperatorSpec.OpCode.MAP, "test-failed-op-4"); + this.allOpSpecs.add(testOp); + inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(testOp); + + OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph); + //failed with serialization error + try { + operatorSpecGraph.clone(); + fail("Should have failed with serialization error"); + } catch (SamzaException se) { + throw se.getCause(); + } + } + + private static class TestDeserializeOperatorSpec extends OperatorSpec { + + public TestDeserializeOperatorSpec(OpCode opCode, String opId) { + super(opCode, opId); + } + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + throw new IOException("Raise IOException to cause deserialization failure"); + } + + @Override + public WatermarkFunction getWatermarkFn() { + return null; + } + + @Override + public TimerFunction getTimerFn() { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java deleted file mode 100644 index 3bb44b5..0000000 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java +++ /dev/null @@ -1,601 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for THE - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.data.TestMessageEnvelope; -import org.apache.samza.operators.spec.InputOperatorSpec; -import org.apache.samza.operators.spec.OperatorSpec.OpCode; -import org.apache.samza.operators.spec.OutputStreamImpl; -import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; -import org.apache.samza.runtime.ApplicationRunner; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.table.TableSpec; -import org.junit.Test; - -import com.google.common.collect.ImmutableList; -import junit.framework.Assert; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestStreamGraphImpl { - - @Test - public void testGetInputStreamWithValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - Serde mockValueSerde = mock(Serde.class); - MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); - assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); - assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); - } - - @Test - public void testGetInputStreamWithKeyValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); - assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); - assertEquals(mockKeySerde, inputOpSpec.getKeySerde()); - assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); - } - - @Test(expected = NullPointerException.class) - public void testGetInputStreamWithNullSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - graph.getInputStream("test-stream-1", null); - } - - @Test - public void testGetInputStreamWithDefaultValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - Serde mockValueSerde = mock(Serde.class); - graph.setDefaultSerde(mockValueSerde); - MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1"); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); - assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); - assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); - } - - @Test - public void testGetInputStreamWithDefaultKeyValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - graph.setDefaultSerde(mockKVSerde); - MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1"); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); - assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); - assertEquals(mockKeySerde, inputOpSpec.getKeySerde()); - assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); - } - - @Test - public void testGetInputStreamWithDefaultDefaultSerde() { - // default default serde == user hasn't provided a default serde - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1"); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); - assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); - assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); - assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde); - } - - @Test - public void testGetInputStreamWithRelaxedTypes() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1"); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); - assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); - } - - @Test - public void testMultipleGetInputStreams() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec1 = mock(StreamSpec.class); - StreamSpec mockStreamSpec2 = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1); - when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1"); - MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2"); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 = - (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec(); - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 = - (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec(); - - assertEquals(graph.getInputOperators().size(), 2); - assertEquals(graph.getInputOperators().get(mockStreamSpec1), inputOpSpec1); - assertEquals(graph.getInputOperators().get(mockStreamSpec2), inputOpSpec2); - } - - @Test(expected = IllegalStateException.class) - public void testGetSameInputStreamTwice() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getInputStream("test-stream-1"); - // should throw exception - graph.getInputStream("test-stream-1"); - } - - @Test - public void testGetOutputStreamWithValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - Serde mockValueSerde = mock(Serde.class); - OutputStream<TestMessageEnvelope> outputStream = - graph.getOutputStream("test-stream-1", mockValueSerde); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); - assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); - assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); - } - - @Test - public void testGetOutputStreamWithKeyValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - graph.setDefaultSerde(mockKVSerde); - OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); - assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); - assertEquals(mockKeySerde, outputStreamImpl.getKeySerde()); - assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); - } - - @Test(expected = NullPointerException.class) - public void testGetOutputStreamWithNullSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - graph.getOutputStream("test-stream-1", null); - } - - @Test - public void testGetOutputStreamWithDefaultValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - - Serde mockValueSerde = mock(Serde.class); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.setDefaultSerde(mockValueSerde); - OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1"); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); - assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); - assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); - } - - @Test - public void testGetOutputStreamWithDefaultKeyValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - graph.setDefaultSerde(mockKVSerde); - - OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1"); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); - assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); - assertEquals(mockKeySerde, outputStreamImpl.getKeySerde()); - assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); - } - - @Test - public void testGetOutputStreamWithDefaultDefaultSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - - OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1"); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); - assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); - assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); - assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde); - } - - @Test(expected = IllegalStateException.class) - public void testSetDefaultSerdeAfterGettingStreams() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getInputStream("test-stream-1"); - graph.setDefaultSerde(mock(Serde.class)); // should throw exception - } - - @Test(expected = IllegalStateException.class) - public void testSetDefaultSerdeAfterGettingOutputStream() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getOutputStream("test-stream-1"); - graph.setDefaultSerde(mock(Serde.class)); // should throw exception - } - - @Test(expected = IllegalStateException.class) - public void testSetDefaultSerdeAfterGettingIntermediateStream() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getIntermediateStream("test-stream-1", null); - graph.setDefaultSerde(mock(Serde.class)); // should throw exception - } - - @Test(expected = IllegalStateException.class) - public void testGetSameOutputStreamTwice() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getOutputStream("test-stream-1"); - graph.getOutputStream("test-stream-1"); // should throw exception - } - - @Test - public void testGetIntermediateStreamWithValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - String mockStreamName = "mockStreamName"; - when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - - Serde mockValueSerde = mock(Serde.class); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream(mockStreamName, mockValueSerde); - - assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); - assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); - assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test - public void testGetIntermediateStreamWithKeyValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - String mockStreamName = "mockStreamName"; - when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream(mockStreamName, mockKVSerde); - - assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); - assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); - assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde()); - assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); - assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test - public void testGetIntermediateStreamWithDefaultValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - String mockStreamName = "mockStreamName"; - when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - - Serde mockValueSerde = mock(Serde.class); - graph.setDefaultSerde(mockValueSerde); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream(mockStreamName, null); - - assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); - assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); - assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test - public void testGetIntermediateStreamWithDefaultKeyValueSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - String mockStreamName = "mockStreamName"; - when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - graph.setDefaultSerde(mockKVSerde); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream(mockStreamName, null); - - assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); - assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); - assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde()); - assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); - assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test - public void testGetIntermediateStreamWithDefaultDefaultSerde() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - StreamSpec mockStreamSpec = mock(StreamSpec.class); - String mockStreamName = "mockStreamName"; - when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream(mockStreamName, null); - - assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); - assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); - assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); - assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde); - assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); - assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde); - } - - @Test(expected = IllegalStateException.class) - public void testGetSameIntermediateStreamTwice() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getIntermediateStream("test-stream-1", mock(Serde.class)); - graph.getIntermediateStream("test-stream-1", mock(Serde.class)); - } - - @Test - public void testGetNextOpIdIncrementsId() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - assertEquals("jobName-1234-merge-0", graph.getNextOpId(OpCode.MERGE, null)); - assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName")); - assertEquals("jobName-1234-map-2", graph.getNextOpId(OpCode.MAP, null)); - } - - @Test(expected = SamzaException.class) - public void testGetNextOpIdRejectsDuplicates() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName")); - graph.getNextOpId(OpCode.JOIN, "customName"); // should throw - } - - @Test - public void testUserDefinedIdValidation() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - - // null and empty userDefinedIDs should fall back to autogenerated IDs. - try { - graph.getNextOpId(OpCode.FILTER, null); - graph.getNextOpId(OpCode.FILTER, ""); - graph.getNextOpId(OpCode.FILTER, " "); - graph.getNextOpId(OpCode.FILTER, "\t"); - } catch (SamzaException e) { - Assert.fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID."); - } - - List<String> validOpIds = ImmutableList.of("op.id", "op_id", "op-id", "1000", "op_1", "OP_ID"); - for (String validOpId: validOpIds) { - try { - graph.getNextOpId(OpCode.FILTER, validOpId); - } catch (Exception e) { - Assert.fail("Received an exception with a valid operator ID: " + validOpId); - } - } - - List<String> invalidOpIds = ImmutableList.of("op id", "op#id"); - for (String invalidOpId: invalidOpIds) { - try { - graph.getNextOpId(OpCode.FILTER, invalidOpId); - Assert.fail("Did not receive an exception with an invalid operator ID: " + invalidOpId); - } catch (SamzaException e) { } - } - } - - @Test - public void testGetInputStreamPreservesInsertionOrder() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - - StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system"); - when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1); - - StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system"); - when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2); - - StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system"); - when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3); - - graph.getInputStream("test-stream-1"); - graph.getInputStream("test-stream-2"); - graph.getInputStream("test-stream-3"); - - List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values()); - Assert.assertEquals(inputSpecs.size(), 3); - Assert.assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1); - Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2); - Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3); - } - - @Test - public void testGetTable() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - Config mockConfig = mock(Config.class); - StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - - BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class); - when(mockTableDescriptor.getTableSpec()).thenReturn( - new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>())); - Assert.assertNotNull(graph.getTable(mockTableDescriptor)); - } -}