http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 0759aba..6fa9ed1 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.samza.Partition; import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.container.TaskContextImpl; @@ -72,8 +73,8 @@ public class TestJoinOperator { @Test public void join() throws Exception { - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -93,25 +94,26 @@ public class TestJoinOperator { mapConfig.put("job.id", "jobId"); StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream"); Config config = new MapConfig(mapConfig); - StreamGraphSpec graphSpec = new StreamGraphSpec(config); - IntegerSerde integerSerde = new IntegerSerde(); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); - GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName"); - GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("inStream", kvSerde); + StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { + IntegerSerde integerSerde = new IntegerSerde(); + KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); + GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("inStream", kvSerde); - MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream(inputDescriptor); + MessageStream<KV<Integer, Integer>> inStream = appDesc.getInputStream(inputDescriptor); - inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join"); + inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join"); + }, config); - createStreamOperatorTask(new SystemClock(), graphSpec); // should throw an exception + createStreamOperatorTask(new SystemClock(), streamAppDesc); // should throw an exception } @Test public void joinFnInitAndClose() throws Exception { TestJoinFunction joinFn = new TestJoinFunction(); - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(joinFn); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(joinFn); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); MessageCollector messageCollector = mock(MessageCollector.class); @@ -129,8 +131,8 @@ public class TestJoinOperator { @Test public void joinReverse() throws Exception { - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -145,8 +147,8 @@ public class TestJoinOperator { @Test public void joinNoMatch() throws Exception { - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -160,8 +162,8 @@ public class TestJoinOperator { @Test public void joinNoMatchReverse() throws Exception { - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -175,8 +177,8 @@ public class TestJoinOperator { @Test public void joinRetainsLatestMessageForKey() throws Exception { - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -193,8 +195,8 @@ public class TestJoinOperator { @Test public void joinRetainsLatestMessageForKeyReverse() throws Exception { - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -211,8 +213,8 @@ public class TestJoinOperator { @Test public void joinRetainsMatchedMessages() throws Exception { - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -234,8 +236,8 @@ public class TestJoinOperator { @Test public void joinRetainsMatchedMessagesReverse() throws Exception { - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -258,8 +260,8 @@ public class TestJoinOperator { @Test public void joinRemovesExpiredMessages() throws Exception { TestClock testClock = new TestClock(); - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(testClock, streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -278,8 +280,8 @@ public class TestJoinOperator { @Test public void joinRemovesExpiredMessagesReverse() throws Exception { TestClock testClock = new TestClock(); - StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); - StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec); + StreamApplicationDescriptorImpl streamAppDesc = this.getTestJoinStreamGraph(new TestJoinFunction()); + StreamOperatorTask sot = createStreamOperatorTask(testClock, streamAppDesc); List<Integer> output = new ArrayList<>(); MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage()); @@ -295,7 +297,8 @@ public class TestJoinOperator { assertTrue(output.isEmpty()); } - private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamGraphSpec graphSpec) throws Exception { + private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamApplicationDescriptorImpl graphSpec) + throws Exception { Map<String, String> mapConfig = new HashMap<>(); mapConfig.put("job.name", "jobName"); mapConfig.put("job.id", "jobId"); @@ -320,31 +323,31 @@ public class TestJoinOperator { return sot; } - private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException { + private StreamApplicationDescriptorImpl getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException { Map<String, String> mapConfig = new HashMap<>(); mapConfig.put("job.name", "jobName"); mapConfig.put("job.id", "jobId"); StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream"); StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", "instream2"); Config config = new MapConfig(mapConfig); - StreamGraphSpec graphSpec = new StreamGraphSpec(config); - IntegerSerde integerSerde = new IntegerSerde(); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); - GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName"); - GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor1 = sd.getInputDescriptor("inStream", kvSerde); - GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor2 = sd.getInputDescriptor("inStream2", kvSerde); - - MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream(inputDescriptor1); - MessageStream<KV<Integer, Integer>> inStream2 = graphSpec.getInputStream(inputDescriptor2); - - inStream - .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1") - .sink((message, messageCollector, taskCoordinator) -> { - SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - - return graphSpec; + + return new StreamApplicationDescriptorImpl(appDesc -> { + IntegerSerde integerSerde = new IntegerSerde(); + KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); + GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor1 = sd.getInputDescriptor("inStream", kvSerde); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor2 = sd.getInputDescriptor("inStream2", kvSerde); + + MessageStream<KV<Integer, Integer>> inStream = appDesc.getInputStream(inputDescriptor1); + MessageStream<KV<Integer, Integer>> inStream2 = appDesc.getInputStream(inputDescriptor2); + + inStream + .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1") + .sink((message, messageCollector, taskCoordinator) -> { + SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + }, config); } private static class TestJoinFunction
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index 001ffda..566079b 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FilterFunction; @@ -70,7 +71,7 @@ public class TestMessageStreamImpl { @Test public void testMap() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -95,7 +96,7 @@ public class TestMessageStreamImpl { @Test public void testFlatMap() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -112,7 +113,7 @@ public class TestMessageStreamImpl { @Test public void testFlatMapWithRelaxedTypes() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -132,7 +133,7 @@ public class TestMessageStreamImpl { @Test public void testFilter() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -157,7 +158,7 @@ public class TestMessageStreamImpl { @Test public void testSink() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -174,7 +175,7 @@ public class TestMessageStreamImpl { @Test public void testSendTo() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class); @@ -200,7 +201,7 @@ public class TestMessageStreamImpl { @Test public void testPartitionBy() throws IOException { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); String mockOpName = "mockName"; when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName); @@ -231,7 +232,7 @@ public class TestMessageStreamImpl { @Test public void testRepartitionWithoutSerde() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); String mockOpName = "mockName"; when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName); @@ -261,7 +262,7 @@ public class TestMessageStreamImpl { @Test public void testWindowWithRelaxedTypes() throws Exception { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); @@ -285,7 +286,7 @@ public class TestMessageStreamImpl { @Test public void testJoin() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec leftInputOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec); OperatorSpec rightInputOpSpec = mock(OperatorSpec.class); @@ -317,7 +318,7 @@ public class TestMessageStreamImpl { @Test public void testSendToTable() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec inputOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> source = new MessageStreamImpl<>(mockGraph, inputOpSpec); @@ -339,7 +340,7 @@ public class TestMessageStreamImpl { @Test public void testStreamTableJoin() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec leftInputOpSpec = mock(OperatorSpec.class); MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec); OperatorSpec rightInputOpSpec = mock(OperatorSpec.class); @@ -367,7 +368,7 @@ public class TestMessageStreamImpl { @Test public void testMerge() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); OperatorSpec mockOpSpec1 = mock(OperatorSpec.class); MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec1); @@ -407,7 +408,7 @@ public class TestMessageStreamImpl { @Test public void testMergeWithRelaxedTypes() { - StreamGraphSpec mockGraph = mock(StreamGraphSpec.class); + StreamApplicationDescriptorImpl mockGraph = mock(StreamApplicationDescriptorImpl.class); MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class)); // other streams have the same message type T as input stream message type M http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java index 6469326..a5b15b8 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import org.apache.samza.SamzaException; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.InputOperatorSpec; @@ -57,14 +58,14 @@ import static org.mockito.Mockito.*; @PrepareForTest(OperatorSpec.class) public class TestOperatorSpecGraph { - private StreamGraphSpec mockGraph; + private StreamApplicationDescriptorImpl mockAppDesc; private Map<String, InputOperatorSpec> inputOpSpecMap; private Map<String, OutputStreamImpl> outputStrmMap; private Set<OperatorSpec> allOpSpecs; @Before public void setUp() { - this.mockGraph = mock(StreamGraphSpec.class); + this.mockAppDesc = mock(StreamApplicationDescriptorImpl.class); /** * Setup two linear transformation pipelines: @@ -91,8 +92,8 @@ public class TestOperatorSpecGraph { inputOpSpecMap.put(streamId2, testInput2); this.outputStrmMap = new LinkedHashMap<>(); outputStrmMap.put(outputStreamId, outputStream1); - when(mockGraph.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap)); - when(mockGraph.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap)); + when(mockAppDesc.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap)); + when(mockAppDesc.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap)); this.allOpSpecs = new HashSet<OperatorSpec>() { { this.add(testInput); this.add(filterOp); @@ -105,7 +106,7 @@ public class TestOperatorSpecGraph { @After public void tearDown() { - this.mockGraph = null; + this.mockAppDesc = null; this.inputOpSpecMap = null; this.outputStrmMap = null; this.allOpSpecs = null; @@ -113,7 +114,7 @@ public class TestOperatorSpecGraph { @Test public void testConstructor() { - OperatorSpecGraph specGraph = new OperatorSpecGraph(mockGraph); + OperatorSpecGraph specGraph = new OperatorSpecGraph(mockAppDesc); assertEquals(specGraph.getInputOperators(), inputOpSpecMap); assertEquals(specGraph.getOutputStreams(), outputStrmMap); assertTrue(specGraph.getTables().isEmpty()); @@ -123,7 +124,7 @@ public class TestOperatorSpecGraph { @Test public void testClone() { - OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph); + OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockAppDesc); OperatorSpecGraph clonedSpecGraph = operatorSpecGraph.clone(); OperatorSpecTestUtils.assertClonedGraph(operatorSpecGraph, clonedSpecGraph); } @@ -137,7 +138,7 @@ public class TestOperatorSpecGraph { //failed with serialization error try { - new OperatorSpecGraph(mockGraph); + new OperatorSpecGraph(mockAppDesc); fail("Should have failed with serialization error"); } catch (SamzaException se) { throw se.getCause(); @@ -150,7 +151,7 @@ public class TestOperatorSpecGraph { this.allOpSpecs.add(testOp); inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(testOp); - OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph); + OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockAppDesc); //failed with serialization error try { operatorSpecGraph.clone(); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java deleted file mode 100644 index 9629efa..0000000 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java +++ /dev/null @@ -1,506 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for THE - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import com.google.common.collect.ImmutableList; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.data.TestMessageEnvelope; -import org.apache.samza.operators.descriptors.GenericInputDescriptor; -import org.apache.samza.operators.descriptors.GenericOutputDescriptor; -import org.apache.samza.operators.descriptors.GenericSystemDescriptor; -import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; -import org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider; -import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; -import org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider; -import org.apache.samza.operators.functions.InputTransformer; -import org.apache.samza.operators.functions.StreamExpander; -import org.apache.samza.operators.spec.InputOperatorSpec; -import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.OperatorSpec.OpCode; -import org.apache.samza.operators.spec.OutputStreamImpl; -import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; -import org.apache.samza.serializers.IntegerSerde; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.table.TableSpec; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -@SuppressWarnings("unchecked") -public class TestStreamGraphSpec { - - @Test - public void testGetInputStreamWithValueSerde() { - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - String streamId = "test-stream-1"; - Serde mockValueSerde = mock(Serde.class); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde); - MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(isd); - - InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); - assertEquals(streamId, inputOpSpec.getStreamId()); - assertEquals(isd, graphSpec.getInputDescriptors().get(streamId)); - assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); - } - - @Test - public void testGetInputStreamWithKeyValueSerde() { - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - String streamId = "test-stream-1"; - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde); - MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(isd); - - InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); - assertEquals(streamId, inputOpSpec.getStreamId()); - assertEquals(isd, graphSpec.getInputDescriptors().get(streamId)); - assertEquals(mockKeySerde, inputOpSpec.getKeySerde()); - assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); - } - - @Test(expected = IllegalArgumentException.class) - public void testGetInputStreamWithNullSerde() { - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null); - graphSpec.getInputStream(isd); - } - - @Test - public void testGetInputStreamWithTransformFunction() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - Serde mockValueSerde = mock(Serde.class); - InputTransformer transformer = ime -> ime; - MockTransformingSystemDescriptor sd = new MockTransformingSystemDescriptor("mockSystem", transformer); - MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde); - MessageStream inputStream = graphSpec.getInputStream(isd); - - InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); - assertEquals(streamId, inputOpSpec.getStreamId()); - assertEquals(isd, graphSpec.getInputDescriptors().get(streamId)); - assertEquals(transformer, inputOpSpec.getTransformer()); - } - - @Test - public void testGetInputStreamWithExpandingSystem() { - String streamId = "test-stream-1"; - String expandedStreamId = "expanded-stream"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - AtomicInteger expandCallCount = new AtomicInteger(); - StreamExpander expander = (sg, isd) -> { - expandCallCount.incrementAndGet(); - InputDescriptor expandedISD = - new GenericSystemDescriptor("expanded-system", "mockFactoryClass") - .getInputDescriptor(expandedStreamId, new IntegerSerde()); - - return sg.getInputStream(expandedISD); - }; - MockExpandingSystemDescriptor sd = new MockExpandingSystemDescriptor("mock-system", expander); - MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde()); - MessageStream inputStream = graphSpec.getInputStream(isd); - InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec(); - assertEquals(1, expandCallCount.get()); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(inputOpSpec, graphSpec.getInputOperators().get(expandedStreamId)); - assertFalse(graphSpec.getInputOperators().containsKey(streamId)); - assertFalse(graphSpec.getInputDescriptors().containsKey(streamId)); - assertTrue(graphSpec.getInputDescriptors().containsKey(expandedStreamId)); - assertEquals(expandedStreamId, inputOpSpec.getStreamId()); - } - - @Test - public void testMultipleGetInputStreams() { - String streamId1 = "test-stream-1"; - String streamId2 = "test-stream-2"; - - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, mock(Serde.class)); - GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, mock(Serde.class)); - MessageStream<Object> inputStream1 = graphSpec.getInputStream(isd1); - MessageStream<Object> inputStream2 = graphSpec.getInputStream(isd2); - - InputOperatorSpec inputOpSpec1 = - (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec(); - InputOperatorSpec inputOpSpec2 = - (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec(); - - assertEquals(graphSpec.getInputOperators().size(), 2); - assertEquals(graphSpec.getInputOperators().get(streamId1), inputOpSpec1); - assertEquals(graphSpec.getInputOperators().get(streamId2), inputOpSpec2); - } - - @Test(expected = IllegalStateException.class) - public void testGetSameInputStreamTwice() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, mock(Serde.class)); - GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, mock(Serde.class)); - graphSpec.getInputStream(isd1); - // should throw exception - graphSpec.getInputStream(isd2); - } - - @Test - public void testMultipleSystemDescriptorForSameSystemName() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericInputDescriptor isd1 = sd1.getInputDescriptor(streamId, mock(Serde.class)); - GenericInputDescriptor isd2 = sd2.getInputDescriptor(streamId, mock(Serde.class)); - GenericOutputDescriptor osd1 = sd2.getOutputDescriptor(streamId, mock(Serde.class)); - - graphSpec.getInputStream(isd1); - boolean passed = false; - try { - graphSpec.getInputStream(isd2); - passed = true; - } catch (IllegalStateException e) { } - - try { - graphSpec.getOutputStream(osd1); - passed = true; - } catch (IllegalStateException e) { } - - try { - graphSpec.setDefaultSystem(sd2); - passed = true; - } catch (IllegalStateException e) { } - - assertFalse(passed); - } - - @Test - public void testGetOutputStreamWithValueSerde() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - Serde mockValueSerde = mock(Serde.class); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockValueSerde); - OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(osd); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl); - assertEquals(streamId, outputStreamImpl.getStreamId()); - assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId)); - assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); - } - - @Test - public void testGetOutputStreamWithKeyValueSerde() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockKVSerde); - OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(osd); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl); - assertEquals(streamId, outputStreamImpl.getStreamId()); - assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId)); - assertEquals(mockKeySerde, outputStreamImpl.getKeySerde()); - assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); - } - - @Test(expected = IllegalArgumentException.class) - public void testGetOutputStreamWithNullSerde() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null); - graphSpec.getOutputStream(osd); - } - - @Test(expected = IllegalStateException.class) - public void testSetDefaultSystemDescriptorAfterGettingInputStream() { - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - GenericInputDescriptor id = new GenericSystemDescriptor("system", "factory.class.name") - .getInputDescriptor("input-stream", mock(Serde.class)); - graphSpec.getInputStream(id); - graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception - } - - @Test(expected = IllegalStateException.class) - public void testSetDefaultSystemDescriptorAfterGettingOutputStream() { - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - GenericOutputDescriptor od = new GenericSystemDescriptor("system", "factory.class.name") - .getOutputDescriptor("output-stream", mock(Serde.class)); - graphSpec.getOutputStream(od); - graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception - } - - @Test(expected = IllegalStateException.class) - public void testSetDefaultSerdeAfterGettingIntermediateStream() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.getIntermediateStream(streamId, mock(Serde.class), false); - graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception - } - - @Test(expected = IllegalStateException.class) - public void testGetSameOutputStreamTwice() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, mock(Serde.class)); - GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, mock(Serde.class)); - graphSpec.getOutputStream(osd1); - graphSpec.getOutputStream(osd2); // should throw exception - } - - @Test - public void testGetIntermediateStreamWithValueSerde() { - String streamId = "stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - Serde mockValueSerde = mock(Serde.class); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graphSpec.getIntermediateStream(streamId, mockValueSerde, false); - - assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream()); - assertEquals(streamId, intermediateStreamImpl.getStreamId()); - assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertTrue(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test - public void testGetIntermediateStreamWithKeyValueSerde() { - String streamId = "streamId"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graphSpec.getIntermediateStream(streamId, mockKVSerde, false); - - assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream()); - assertEquals(streamId, intermediateStreamImpl.getStreamId()); - assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde()); - assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); - assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test - public void testGetIntermediateStreamWithNoSerde() { - Config mockConfig = mock(Config.class); - String streamId = "streamId"; - - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graphSpec.getIntermediateStream(streamId, null, false); - - assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream()); - assertEquals(streamId, intermediateStreamImpl.getStreamId()); - assertNull(intermediateStreamImpl.getOutputStream().getKeySerde()); - assertNull(intermediateStreamImpl.getOutputStream().getValueSerde()); - assertNull(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); - assertNull(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test(expected = IllegalStateException.class) - public void testGetSameIntermediateStreamTwice() { - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false); - graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false); - } - - @Test - public void testGetNextOpIdIncrementsId() { - Config mockConfig = mock(Config.class); - when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); - - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - assertEquals("jobName-1234-merge-0", graphSpec.getNextOpId(OpCode.MERGE, null)); - assertEquals("jobName-1234-join-customName", graphSpec.getNextOpId(OpCode.JOIN, "customName")); - assertEquals("jobName-1234-map-2", graphSpec.getNextOpId(OpCode.MAP, null)); - } - - @Test(expected = SamzaException.class) - public void testGetNextOpIdRejectsDuplicates() { - Config mockConfig = mock(Config.class); - when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); - - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - assertEquals("jobName-1234-join-customName", graphSpec.getNextOpId(OpCode.JOIN, "customName")); - graphSpec.getNextOpId(OpCode.JOIN, "customName"); // should throw - } - - @Test - public void testIdValidation() { - Config mockConfig = mock(Config.class); - when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); - - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - - // null and empty userDefinedIDs should fall back to autogenerated IDs. - try { - graphSpec.getNextOpId(OpCode.FILTER, null); - graphSpec.getNextOpId(OpCode.FILTER, ""); - graphSpec.getNextOpId(OpCode.FILTER, " "); - graphSpec.getNextOpId(OpCode.FILTER, "\t"); - } catch (SamzaException e) { - fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID."); - } - - List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", "op_1", "OP_ID"); - for (String validOpId: validOpIds) { - try { - graphSpec.getNextOpId(OpCode.FILTER, validOpId); - } catch (Exception e) { - fail("Received an exception with a valid operator ID: " + validOpId); - } - } - - List<String> invalidOpIds = ImmutableList.of("op id", "op#id"); - for (String invalidOpId: invalidOpIds) { - try { - graphSpec.getNextOpId(OpCode.FILTER, invalidOpId); - fail("Did not receive an exception with an invalid operator ID: " + invalidOpId); - } catch (SamzaException e) { } - } - } - - @Test - public void testGetInputStreamPreservesInsertionOrder() { - Config mockConfig = mock(Config.class); - - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - - String testStreamId1 = "test-stream-1"; - String testStreamId2 = "test-stream-2"; - String testStreamId3 = "test-stream-3"; - GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); - graphSpec.getInputStream(sd.getInputDescriptor(testStreamId1, mock(Serde.class))); - graphSpec.getInputStream(sd.getInputDescriptor(testStreamId2, mock(Serde.class))); - graphSpec.getInputStream(sd.getInputDescriptor(testStreamId3, mock(Serde.class))); - - List<InputOperatorSpec> inputSpecs = new ArrayList<>(graphSpec.getInputOperators().values()); - assertEquals(inputSpecs.size(), 3); - assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1); - assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2); - assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3); - } - - @Test - public void testGetTable() { - Config mockConfig = mock(Config.class); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - - BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class); - when(mockTableDescriptor.getTableId()).thenReturn("t1"); - when(mockTableDescriptor.getTableSpec()).thenReturn( - new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>())); - assertNotNull(graphSpec.getTable(mockTableDescriptor)); - } - - @Test(expected = IllegalStateException.class) - public void testGetTableWithBadId() { - Config mockConfig = mock(Config.class); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - - BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class); - when(mockTableDescriptor.getTableId()).thenReturn("my.table"); - graphSpec.getTable(mockTableDescriptor); - } - - class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> { - public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) { - super(systemName, "factory.class", null, expander); - } - - @Override - public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) { - return new MockInputDescriptor<>(streamId, this, serde); - } - } - - class MockTransformingSystemDescriptor extends SystemDescriptor<MockTransformingSystemDescriptor> implements TransformingInputDescriptorProvider<Integer> { - public MockTransformingSystemDescriptor(String systemName, InputTransformer transformer) { - super(systemName, "factory.class", transformer, null); - } - - @Override - public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) { - return new MockInputDescriptor<>(streamId, this, serde); - } - } - - public class MockInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> { - MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) { - super(streamId, serde, systemDescriptor, null); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 604c72d..6f8a8bc 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -21,7 +21,6 @@ package org.apache.samza.operators.impl; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; - import java.io.Serializable; import java.time.Duration; import java.util.ArrayList; @@ -35,6 +34,7 @@ import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -49,7 +49,6 @@ import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericOutputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; @@ -58,7 +57,6 @@ import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.InitableFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; @@ -74,6 +72,7 @@ import org.apache.samza.task.TaskCoordinator; import org.apache.samza.testUtils.StreamTestUtils; import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; +import org.apache.samza.util.TimestampedValue; import org.junit.After; import org.junit.Test; @@ -220,7 +219,7 @@ public class TestOperatorImplGraph { @Test public void testEmptyChain() { - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class)); OperatorImplGraph opGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class)); assertEquals(0, opGraph.getAllInputOperators().size()); @@ -244,17 +243,18 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName); Config config = new MapConfig(configs); - StreamGraphSpec graphSpec = new StreamGraphSpec(config); - GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); - GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); - GenericOutputDescriptor outputDescriptor = sd.getOutputDescriptor(outputStreamId, mock(Serde.class)); - MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor); - OutputStream<Object> outputStream = graphSpec.getOutputStream(outputDescriptor); + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); + GenericOutputDescriptor outputDescriptor = sd.getOutputDescriptor(outputStreamId, mock(Serde.class)); + MessageStream<Object> inputStream = appDesc.getInputStream(inputDescriptor); + OutputStream<Object> outputStream = appDesc.getOutputStream(outputDescriptor); - inputStream - .filter(mock(FilterFunction.class)) - .map(mock(MapFunction.class)) - .sendTo(outputStream); + inputStream + .filter(mock(FilterFunction.class)) + .map(mock(MapFunction.class)) + .sendTo(outputStream); + }, config); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); @@ -297,19 +297,20 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName); Config config = new MapConfig(configs); - StreamGraphSpec graphSpec = new StreamGraphSpec(config); - GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); - GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass"); - GenericInputDescriptor inputDescriptor = isd.getInputDescriptor(inputStreamId, mock(Serde.class)); - GenericOutputDescriptor outputDescriptor = osd.getOutputDescriptor(outputStreamId, - KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); - MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor); - OutputStream<KV<Integer, String>> outputStream = graphSpec.getOutputStream(outputDescriptor); - - inputStream - .partitionBy(Object::hashCode, Object::toString, - KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)), "p1") - .sendTo(outputStream); + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { + GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor = isd.getInputDescriptor(inputStreamId, mock(Serde.class)); + GenericOutputDescriptor outputDescriptor = osd.getOutputDescriptor(outputStreamId, + KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); + MessageStream<Object> inputStream = appDesc.getInputStream(inputDescriptor); + OutputStream<KV<Integer, String>> outputStream = appDesc.getOutputStream(outputDescriptor); + + inputStream + .partitionBy(Object::hashCode, Object::toString, + KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)), "p1") + .sendTo(outputStream); + }, config); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); @@ -351,13 +352,13 @@ public class TestOperatorImplGraph { HashMap<String, String> configMap = new HashMap<>(); StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, inputPhysicalName); Config config = new MapConfig(configMap); - StreamGraphSpec graphSpec = new StreamGraphSpec(config); - - GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); - GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); - MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor); - inputStream.filter(mock(FilterFunction.class)); - inputStream.map(mock(MapFunction.class)); + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); + MessageStream<Object> inputStream = appDesc.getInputStream(inputDescriptor); + inputStream.filter(mock(FilterFunction.class)); + inputStream.map(mock(MapFunction.class)); + }, config); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); @@ -380,31 +381,28 @@ public class TestOperatorImplGraph { HashMap<String, String> configs = new HashMap<>(); StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName); Config config = new MapConfig(configs); - StreamGraphSpec graphSpec = new StreamGraphSpec(config); - - GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); - GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); - MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor); - MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class)); - MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class)); - MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2)); - + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); + MessageStream<Object> inputStream = appDesc.getInputStream(inputDescriptor); + MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class)); + MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class)); + stream1.merge(Collections.singleton(stream2)) + .map(new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m)); + }, mock(Config.class)); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); TaskName mockTaskName = mock(TaskName.class); when(mockTaskContext.getTaskName()).thenReturn(mockTaskName); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - MapFunction testMapFunction = new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m); - mergedStream.map(testMapFunction); - OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class)); Set<OperatorImpl> opSet = opImplGraph.getAllInputOperators().stream().collect(HashSet::new, (s, op) -> addOperatorRecursively(s, op), HashSet::addAll); Object[] mergeOps = opSet.stream().filter(op -> op.getOperatorSpec().getOpCode() == OpCode.MERGE).toArray(); - assertEquals(mergeOps.length, 1); - assertEquals(((OperatorImpl) mergeOps[0]).registeredOperators.size(), 1); + assertEquals(1, mergeOps.length); + assertEquals(1, ((OperatorImpl) mergeOps[0]).registeredOperators.size()); OperatorImpl mapOp = (OperatorImpl) ((OperatorImpl) mergeOps[0]).registeredOperators.iterator().next(); assertEquals(mapOp.getOperatorSpec().getOpCode(), OpCode.MAP); @@ -425,21 +423,22 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, inputPhysicalName1); StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, inputPhysicalName2); Config config = new MapConfig(configs); - StreamGraphSpec graphSpec = new StreamGraphSpec(config); Integer joinKey = new Integer(1); Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey; JoinFunction testJoinFunction = new TestJoinFunction("jobName-jobId-join-j1", (BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn); - GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); - GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class)); - GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class)); - MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputDescriptor1); - MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputDescriptor2); + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class)); + GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class)); + MessageStream<Object> inputStream1 = appDesc.getInputStream(inputDescriptor1); + MessageStream<Object> inputStream2 = appDesc.getInputStream(inputDescriptor2); - inputStream1.join(inputStream2, testJoinFunction, - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1"); + inputStream1.join(inputStream2, testJoinFunction, + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1"); + }, config); TaskName mockTaskName = mock(TaskName.class); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); @@ -496,20 +495,20 @@ public class TestOperatorImplGraph { TaskContextImpl mockContext = mock(TaskContextImpl.class); when(mockContext.getTaskName()).thenReturn(mockTaskName); when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - - GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); - GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class)); - GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class)); - MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputDescriptor1); - MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputDescriptor2); + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class)); + GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class)); + MessageStream<Object> inputStream1 = appDesc.getInputStream(inputDescriptor1); + MessageStream<Object> inputStream2 = appDesc.getInputStream(inputDescriptor2); - Function mapFn = (Function & Serializable) m -> m; - inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn)) - .map(new TestMapFunction<Object, Object>("2", mapFn)); + Function mapFn = (Function & Serializable) m -> m; + inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn)) + .map(new TestMapFunction<Object, Object>("2", mapFn)); - inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn)) - .map(new TestMapFunction<Object, Object>("4", mapFn)); + inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn)) + .map(new TestMapFunction<Object, Object>("4", mapFn)); + }, mockConfig); OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockContext, SystemClock.instance()); @@ -592,33 +591,34 @@ public class TestOperatorImplGraph { StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, outputStreamId2); Config config = new MapConfig(configs); - StreamGraphSpec graphSpec = new StreamGraphSpec(config); - GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); - GenericInputDescriptor inputDescriptor1 = isd.getInputDescriptor(inputStreamId1, mock(Serde.class)); - GenericInputDescriptor inputDescriptor2 = isd.getInputDescriptor(inputStreamId2, mock(Serde.class)); - GenericInputDescriptor inputDescriptor3 = isd.getInputDescriptor(inputStreamId3, mock(Serde.class)); - GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass"); - GenericOutputDescriptor outputDescriptor1 = osd.getOutputDescriptor(outputStreamId1, mock(Serde.class)); - GenericOutputDescriptor outputDescriptor2 = osd.getOutputDescriptor(outputStreamId2, mock(Serde.class)); - MessageStream messageStream1 = graphSpec.getInputStream(inputDescriptor1).map(m -> m); - MessageStream messageStream2 = graphSpec.getInputStream(inputDescriptor2).filter(m -> true); - MessageStream messageStream3 = - graphSpec.getInputStream(inputDescriptor3) - .filter(m -> true) - .partitionBy(m -> "m", m -> m, "p1") - .map(m -> m); - OutputStream<Object> outputStream1 = graphSpec.getOutputStream(outputDescriptor1); - OutputStream<Object> outputStream2 = graphSpec.getOutputStream(outputDescriptor2); - - messageStream1 - .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1") - .partitionBy(m -> "m", m -> m, "p2") - .sendTo(outputStream1); - messageStream3 - .join(messageStream2, mock(JoinFunction.class), - mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2") - .sendTo(outputStream2); + StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { + GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor1 = isd.getInputDescriptor(inputStreamId1, mock(Serde.class)); + GenericInputDescriptor inputDescriptor2 = isd.getInputDescriptor(inputStreamId2, mock(Serde.class)); + GenericInputDescriptor inputDescriptor3 = isd.getInputDescriptor(inputStreamId3, mock(Serde.class)); + GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass"); + GenericOutputDescriptor outputDescriptor1 = osd.getOutputDescriptor(outputStreamId1, mock(Serde.class)); + GenericOutputDescriptor outputDescriptor2 = osd.getOutputDescriptor(outputStreamId2, mock(Serde.class)); + MessageStream messageStream1 = appDesc.getInputStream(inputDescriptor1).map(m -> m); + MessageStream messageStream2 = appDesc.getInputStream(inputDescriptor2).filter(m -> true); + MessageStream messageStream3 = + appDesc.getInputStream(inputDescriptor3) + .filter(m -> true) + .partitionBy(m -> "m", m -> m, "p1") + .map(m -> m); + OutputStream<Object> outputStream1 = appDesc.getOutputStream(outputDescriptor1); + OutputStream<Object> outputStream2 = appDesc.getOutputStream(outputDescriptor2); + + messageStream1 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1") + .partitionBy(m -> "m", m -> m, "p2") + .sendTo(outputStream1); + messageStream3 + .join(messageStream2, mock(JoinFunction.class), + mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2") + .sendTo(outputStream2); + }, config); Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph(), new StreamConfig(config)); http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java index 6c34fcd..7d468c9 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java @@ -19,25 +19,33 @@ package org.apache.samza.operators.impl; - import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.impl.store.TestInMemoryStore; import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; -import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.triggers.FiringType; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; @@ -59,15 +67,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Collections; - import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -546,72 +545,80 @@ public class TestWindowOperator { verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK); } - private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode mode, + private StreamApplicationDescriptorImpl getKeyedTumblingWindowStreamGraph(AccumulationMode mode, Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException { - StreamGraphSpec graph = new StreamGraphSpec(config); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); - GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); - graph.getInputStream(inputDescriptor) - .window(Windows.keyedTumblingWindow(KV::getKey, duration, new IntegerSerde(), kvSerde) - .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1") - .sink((message, messageCollector, taskCoordinator) -> { - SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - - return graph; + + StreamApplication userApp = appDesc -> { + KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); + GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); + appDesc.getInputStream(inputDescriptor) + .window(Windows.keyedTumblingWindow(KV::getKey, duration, new IntegerSerde(), kvSerde) + .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1") + .sink((message, messageCollector, taskCoordinator) -> { + SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + }; + + return new StreamApplicationDescriptorImpl(userApp, config); } - private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode, + private StreamApplicationDescriptorImpl getTumblingWindowStreamGraph(AccumulationMode mode, Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException { - StreamGraphSpec graph = new StreamGraphSpec(config); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); - GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); - graph.getInputStream(inputDescriptor) - .window(Windows.tumblingWindow(duration, kvSerde).setEarlyTrigger(earlyTrigger) - .setAccumulationMode(mode), "w1") - .sink((message, messageCollector, taskCoordinator) -> { - SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - return graph; + StreamApplication userApp = appDesc -> { + KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); + GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); + appDesc.getInputStream(inputDescriptor) + .window(Windows.tumblingWindow(duration, kvSerde).setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode), "w1") + .sink((message, messageCollector, taskCoordinator) -> { + SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + }; + + return new StreamApplicationDescriptorImpl(userApp, config); } - private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) throws IOException { - StreamGraphSpec graph = new StreamGraphSpec(config); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); - GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); - graph.getInputStream(inputDescriptor) - .window(Windows.keyedSessionWindow(KV::getKey, duration, new IntegerSerde(), kvSerde) - .setAccumulationMode(mode), "w1") - .sink((message, messageCollector, taskCoordinator) -> { - SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - return graph; + private StreamApplicationDescriptorImpl getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) throws IOException { + StreamApplication userApp = appDesc -> { + KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); + GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); + appDesc.getInputStream(inputDescriptor) + .window(Windows.keyedSessionWindow(KV::getKey, duration, new IntegerSerde(), kvSerde) + .setAccumulationMode(mode), "w1") + .sink((message, messageCollector, taskCoordinator) -> { + SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + }; + + return new StreamApplicationDescriptorImpl(userApp, config); } - private StreamGraphSpec getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration timeDuration, + private StreamApplicationDescriptorImpl getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) throws IOException { - StreamGraphSpec graph = new StreamGraphSpec(config); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); - GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); - MessageStream<KV<Integer, Integer>> integers = graph.getInputStream(inputDescriptor); - - integers - .map(new KVMapFunction()) - .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(timeDuration, () -> 0, (m, c) -> c + 1, new IntegerSerde()) - .setEarlyTrigger(earlyTrigger) - .setAccumulationMode(mode), "w1") - .sink((message, messageCollector, taskCoordinator) -> { - SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); - messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); - }); - return graph; + StreamApplication userApp = appDesc -> { + KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); + GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); + MessageStream<KV<Integer, Integer>> integers = appDesc.getInputStream(inputDescriptor); + + integers + .map(new KVMapFunction()) + .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(timeDuration, () -> 0, (m, c) -> c + 1, new IntegerSerde()) + .setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode), "w1") + .sink((message, messageCollector, taskCoordinator) -> { + SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); + messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message)); + }); + }; + + return new StreamApplicationDescriptorImpl(userApp, config); } private static class IntegerEnvelope extends IncomingMessageEnvelope {