http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java index cefe128..864c3fc 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java @@ -19,14 +19,22 @@ package org.apache.samza.execution; +import java.time.Duration; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.impl.store.TimestampedValueSerde; import org.apache.samza.serializers.JsonSerdeV2; @@ -37,12 +45,6 @@ import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.StreamSpec; import org.junit.Test; -import java.time.Duration; -import java.util.Base64; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; @@ -65,13 +67,18 @@ public class TestJobNode { when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - graphSpec.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); - MessageStream<KV<String, Object>> input1 = graphSpec.getInputStream("input1"); - MessageStream<KV<String, Object>> input2 = graphSpec.getInputStream("input2"); - OutputStream<KV<String, Object>> output = graphSpec.getOutputStream("output"); + KVSerde<String, Object> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>()); + GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClass"); + GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = sd.getInputDescriptor("input1", serde); + GenericInputDescriptor<KV<String, Object>> inputDescriptor2 = sd.getInputDescriptor("input2", serde); + GenericOutputDescriptor<KV<String, Object>> outputDescriptor = sd.getOutputDescriptor("output", serde); + MessageStream<KV<String, Object>> input1 = graphSpec.getInputStream(inputDescriptor1); + MessageStream<KV<String, Object>> input2 = graphSpec.getInputStream(inputDescriptor2); + OutputStream<KV<String, Object>> output = graphSpec.getOutputStream(outputDescriptor); JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class); input1 - .partitionBy(KV::getKey, KV::getValue, "p1").map(kv -> kv.value) + .partitionBy(KV::getKey, KV::getValue, serde, "p1") + .map(kv -> kv.value) .join(input2.map(kv -> kv.value), mockJoinFn, new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), Duration.ofHours(1), "j1") @@ -133,7 +140,7 @@ public class TestJobNode { outputKeySerde.startsWith(StringSerde.class.getSimpleName())); assertTrue("Serialized serdes should contain output msg serde", deserializedSerdes.containsKey(outputMsgSerde)); - assertTrue("Serialized output msg serde should be a StringSerde", + assertTrue("Serialized output msg serde should be a JsonSerdeV2", outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); String partitionByKeySerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde"); @@ -145,7 +152,7 @@ public class TestJobNode { assertTrue("Serialized serdes should contain intermediate stream msg serde", deserializedSerdes.containsKey(partitionByMsgSerde)); assertTrue( - "Serialized intermediate stream msg serde should be a StringSerde", + "Serialized intermediate stream msg serde should be a JsonSerdeV2", partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); String leftJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde"); @@ -171,4 +178,50 @@ public class TestJobNode { rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName())); } + @Test + public void testAddSerdeConfigsForRepartitionWithNoDefaultSystem() { + StreamSpec inputSpec = new StreamSpec("input", "input", "input-system"); + StreamSpec partitionBySpec = + new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", "intermediate-system"); + + Config mockConfig = mock(Config.class); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); + when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); + + StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); + GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClassName"); + GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = + sd.getInputDescriptor("input", KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); + MessageStream<KV<String, Object>> input = graphSpec.getInputStream(inputDescriptor1); + input.partitionBy(KV::getKey, KV::getValue, "p1"); + + JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig); + Config config = new MapConfig(); + StreamEdge input1Edge = new StreamEdge(inputSpec, false, false, config); + StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, false, config); + jobNode.addInEdge(input1Edge); + jobNode.addInEdge(repartitionEdge); + jobNode.addOutEdge(repartitionEdge); + + Map<String, String> configs = new HashMap<>(); + jobNode.addSerdeConfigs(configs); + + MapConfig mapConfig = new MapConfig(configs); + Config serializers = mapConfig.subset("serializers.registry.", true); + + // make sure that the serializers deserialize correctly + SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); + Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap( + e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""), + e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes())) + )); + assertEquals(2, serializers.size()); // 2 input stream + + String partitionByKeySerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde"); + String partitionByMsgSerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde"); + assertTrue("Serialized serdes should not contain intermediate stream key serde", + !deserializedSerdes.containsKey(partitionByKeySerde)); + assertTrue("Serialized serdes should not contain intermediate stream msg serde", + !deserializedSerdes.containsKey(partitionByMsgSerde)); + } }
http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 7054727..0759aba 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -19,12 +19,15 @@ package org.apache.samza.operators; import com.google.common.collect.ImmutableSet; + import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.impl.store.TestInMemoryStore; import org.apache.samza.operators.impl.store.TimestampedValueSerde; @@ -38,10 +41,10 @@ import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.testUtils.StreamTestUtils; import org.apache.samza.testUtils.TestClock; import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; -import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -67,17 +70,6 @@ public class TestJoinOperator { private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - private Config config; - - @Before - public void setUp() { - Map<String, String> mapConfig = new HashMap<>(); - mapConfig.put("job.default.system", "insystem"); - mapConfig.put("job.name", "jobName"); - mapConfig.put("job.id", "jobId"); - config = new MapConfig(mapConfig); - } - @Test public void join() throws Exception { StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new TestJoinFunction()); @@ -96,12 +88,19 @@ public class TestJoinOperator { @Test(expected = SamzaException.class) public void joinWithSelfThrowsException() throws Exception { - config.put("streams.instream.system", "insystem"); - + Map<String, String> mapConfig = new HashMap<>(); + mapConfig.put("job.name", "jobName"); + mapConfig.put("job.id", "jobId"); + StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream"); + Config config = new MapConfig(mapConfig); StreamGraphSpec graphSpec = new StreamGraphSpec(config); + IntegerSerde integerSerde = new IntegerSerde(); KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); - MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream("instream", kvSerde); + GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("inStream", kvSerde); + + MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream(inputDescriptor); inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, kvSerde, JOIN_TTL, "join"); @@ -297,7 +296,12 @@ public class TestJoinOperator { } private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamGraphSpec graphSpec) throws Exception { - + Map<String, String> mapConfig = new HashMap<>(); + mapConfig.put("job.name", "jobName"); + mapConfig.put("job.id", "jobId"); + StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream"); + StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", "instream2"); + Config config = new MapConfig(mapConfig); TaskContextImpl taskContext = mock(TaskContextImpl.class); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), @@ -317,11 +321,21 @@ public class TestJoinOperator { } private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException { + Map<String, String> mapConfig = new HashMap<>(); + mapConfig.put("job.name", "jobName"); + mapConfig.put("job.id", "jobId"); + StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream"); + StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", "instream2"); + Config config = new MapConfig(mapConfig); StreamGraphSpec graphSpec = new StreamGraphSpec(config); IntegerSerde integerSerde = new IntegerSerde(); KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde); - MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream("instream", kvSerde); - MessageStream<KV<Integer, Integer>> inStream2 = graphSpec.getInputStream("instream2", kvSerde); + GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", "mockFactoryClassName"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor1 = sd.getInputDescriptor("inStream", kvSerde); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor2 = sd.getInputDescriptor("inStream2", kvSerde); + + MessageStream<KV<Integer, Integer>> inStream = graphSpec.getInputStream(inputDescriptor1); + MessageStream<KV<Integer, Integer>> inStream2 = graphSpec.getInputStream(inputDescriptor2); inStream .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, "j1") http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index fff85e8..001ffda 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -207,7 +207,7 @@ public class TestMessageStreamImpl { OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class); KVSerde mockKVSerde = mock(KVSerde.class); IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class); - when(mockGraph.getIntermediateStream(eq(mockOpName), eq(mockKVSerde))) + when(mockGraph.getIntermediateStream(eq(mockOpName), eq(mockKVSerde), eq(false))) .thenReturn(mockIntermediateStream); when(mockIntermediateStream.getOutputStream()) .thenReturn(mockOutputStreamImpl); @@ -237,7 +237,7 @@ public class TestMessageStreamImpl { when(mockGraph.getNextOpId(anyObject(), anyObject())).thenReturn(mockOpName); OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class); IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class); - when(mockGraph.getIntermediateStream(eq(mockOpName), eq(null))) + when(mockGraph.getIntermediateStream(eq(mockOpName), eq(null), eq(false))) .thenReturn(mockIntermediateStream); when(mockIntermediateStream.getOutputStream()) .thenReturn(mockOutputStreamImpl); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java index 4cfc66a..6469326 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java @@ -73,14 +73,14 @@ public class TestOperatorSpecGraph { */ String inputStreamId1 = "test-input-1"; String outputStreamId = "test-output-1"; - InputOperatorSpec testInput = new InputOperatorSpec(inputStreamId1, new NoOpSerde(), new NoOpSerde(), true, inputStreamId1); + InputOperatorSpec testInput = new InputOperatorSpec(inputStreamId1, new NoOpSerde(), new NoOpSerde(), null, true, inputStreamId1); StreamOperatorSpec filterOp = OperatorSpecs.createFilterOperatorSpec(m -> true, "test-filter-2"); OutputStreamImpl outputStream1 = new OutputStreamImpl(outputStreamId, null, null, true); OutputOperatorSpec outputSpec = OperatorSpecs.createSendToOperatorSpec(outputStream1, "test-output-3"); testInput.registerNextOperatorSpec(filterOp); filterOp.registerNextOperatorSpec(outputSpec); String streamId2 = "test-input-2"; - InputOperatorSpec testInput2 = new InputOperatorSpec(streamId2, new NoOpSerde(), new NoOpSerde(), true, "test-input-4"); + InputOperatorSpec testInput2 = new InputOperatorSpec(streamId2, new NoOpSerde(), new NoOpSerde(), null, true, "test-input-4"); StreamOperatorSpec testMap = OperatorSpecs.createMapOperatorSpec(m -> m, "test-map-5"); SinkOperatorSpec testSink = OperatorSpecs.createSinkOperatorSpec((m, mc, tc) -> { }, "test-sink-6"); testInput2.registerNextOperatorSpec(testMap); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java index 109c138..9629efa 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java @@ -19,18 +19,30 @@ package org.apache.samza.operators; import com.google.common.collect.ImmutableList; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; - +import java.util.concurrent.atomic.AtomicInteger; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.data.TestMessageEnvelope; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.operators.functions.StreamExpander; import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; +import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; @@ -38,7 +50,9 @@ import org.apache.samza.table.TableSpec; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; @@ -47,6 +61,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +@SuppressWarnings("unchecked") public class TestStreamGraphSpec { @Test @@ -55,13 +70,15 @@ public class TestStreamGraphSpec { String streamId = "test-stream-1"; Serde mockValueSerde = mock(Serde.class); - MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId, mockValueSerde); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde); + MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(isd); - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); + InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec(); assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); assertEquals(streamId, inputOpSpec.getStreamId()); + assertEquals(isd, graphSpec.getInputDescriptors().get(streamId)); assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); } @@ -76,93 +93,71 @@ public class TestStreamGraphSpec { Serde mockValueSerde = mock(Serde.class); doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId, mockKVSerde); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde); + MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(isd); - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); + InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec(); assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); assertEquals(streamId, inputOpSpec.getStreamId()); + assertEquals(isd, graphSpec.getInputDescriptors().get(streamId)); assertEquals(mockKeySerde, inputOpSpec.getKeySerde()); assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); } - @Test(expected = NullPointerException.class) + @Test(expected = IllegalArgumentException.class) public void testGetInputStreamWithNullSerde() { StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - graphSpec.getInputStream("test-stream-1", null); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null); + graphSpec.getInputStream(isd); } @Test - public void testGetInputStreamWithDefaultValueSerde() { + public void testGetInputStreamWithTransformFunction() { String streamId = "test-stream-1"; StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); Serde mockValueSerde = mock(Serde.class); - graphSpec.setDefaultSerde(mockValueSerde); - MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); - assertEquals(streamId, inputOpSpec.getStreamId()); - assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); - } - - @Test - public void testGetInputStreamWithDefaultKeyValueSerde() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - graphSpec.setDefaultSerde(mockKVSerde); - MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId); + InputTransformer transformer = ime -> ime; + MockTransformingSystemDescriptor sd = new MockTransformingSystemDescriptor("mockSystem", transformer); + MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde); + MessageStream inputStream = graphSpec.getInputStream(isd); - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); + InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec(); assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); assertEquals(streamId, inputOpSpec.getStreamId()); - assertEquals(mockKeySerde, inputOpSpec.getKeySerde()); - assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); + assertEquals(isd, graphSpec.getInputDescriptors().get(streamId)); + assertEquals(transformer, inputOpSpec.getTransformer()); } @Test - public void testGetInputStreamWithDefaultDefaultSerde() { + public void testGetInputStreamWithExpandingSystem() { String streamId = "test-stream-1"; - - // default default serde == user hasn't provided a default serde + String expandedStreamId = "expanded-stream"; StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); + AtomicInteger expandCallCount = new AtomicInteger(); + StreamExpander expander = (sg, isd) -> { + expandCallCount.incrementAndGet(); + InputDescriptor expandedISD = + new GenericSystemDescriptor("expanded-system", "mockFactoryClass") + .getInputDescriptor(expandedStreamId, new IntegerSerde()); + + return sg.getInputStream(expandedISD); + }; + MockExpandingSystemDescriptor sd = new MockExpandingSystemDescriptor("mock-system", expander); + MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde()); + MessageStream inputStream = graphSpec.getInputStream(isd); + InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) inputStream).getOperatorSpec(); + assertEquals(1, expandCallCount.get()); assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); - assertEquals(streamId, inputOpSpec.getStreamId()); - assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); - assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde); - } - - @Test - public void testGetInputStreamWithRelaxedTypes() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream(streamId); - - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = - (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); - assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); - assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec); - assertEquals(streamId, inputOpSpec.getStreamId()); + assertEquals(inputOpSpec, graphSpec.getInputOperators().get(expandedStreamId)); + assertFalse(graphSpec.getInputOperators().containsKey(streamId)); + assertFalse(graphSpec.getInputDescriptors().containsKey(streamId)); + assertTrue(graphSpec.getInputDescriptors().containsKey(expandedStreamId)); + assertEquals(expandedStreamId, inputOpSpec.getStreamId()); } @Test @@ -171,12 +166,15 @@ public class TestStreamGraphSpec { String streamId2 = "test-stream-2"; StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - MessageStream<Object> inputStream1 = graphSpec.getInputStream(streamId1); - MessageStream<Object> inputStream2 = graphSpec.getInputStream(streamId2); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, mock(Serde.class)); + GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, mock(Serde.class)); + MessageStream<Object> inputStream1 = graphSpec.getInputStream(isd1); + MessageStream<Object> inputStream2 = graphSpec.getInputStream(isd2); - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 = + InputOperatorSpec inputOpSpec1 = (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec(); - InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 = + InputOperatorSpec inputOpSpec2 = (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec(); assertEquals(graphSpec.getInputOperators().size(), 2); @@ -188,137 +186,127 @@ public class TestStreamGraphSpec { public void testGetSameInputStreamTwice() { String streamId = "test-stream-1"; StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.getInputStream(streamId); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, mock(Serde.class)); + GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, mock(Serde.class)); + graphSpec.getInputStream(isd1); // should throw exception - graphSpec.getInputStream(streamId); + graphSpec.getInputStream(isd2); } @Test - public void testGetOutputStreamWithValueSerde() { + public void testMultipleSystemDescriptorForSameSystemName() { String streamId = "test-stream-1"; StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); + GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericInputDescriptor isd1 = sd1.getInputDescriptor(streamId, mock(Serde.class)); + GenericInputDescriptor isd2 = sd2.getInputDescriptor(streamId, mock(Serde.class)); + GenericOutputDescriptor osd1 = sd2.getOutputDescriptor(streamId, mock(Serde.class)); + + graphSpec.getInputStream(isd1); + boolean passed = false; + try { + graphSpec.getInputStream(isd2); + passed = true; + } catch (IllegalStateException e) { } - Serde mockValueSerde = mock(Serde.class); - OutputStream<TestMessageEnvelope> outputStream = - graphSpec.getOutputStream(streamId, mockValueSerde); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl); - assertEquals(streamId, outputStreamImpl.getStreamId()); - assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); - } - - @Test - public void testGetOutputStreamWithKeyValueSerde() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - graphSpec.setDefaultSerde(mockKVSerde); - OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(streamId, mockKVSerde); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl); - assertEquals(streamId, outputStreamImpl.getStreamId()); - assertEquals(mockKeySerde, outputStreamImpl.getKeySerde()); - assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); - } + try { + graphSpec.getOutputStream(osd1); + passed = true; + } catch (IllegalStateException e) { } - @Test(expected = NullPointerException.class) - public void testGetOutputStreamWithNullSerde() { - String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); + try { + graphSpec.setDefaultSystem(sd2); + passed = true; + } catch (IllegalStateException e) { } - graphSpec.getOutputStream(streamId, null); + assertFalse(passed); } @Test - public void testGetOutputStreamWithDefaultValueSerde() { + public void testGetOutputStreamWithValueSerde() { String streamId = "test-stream-1"; + StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); Serde mockValueSerde = mock(Serde.class); - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.setDefaultSerde(mockValueSerde); - OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(streamId); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockValueSerde); + OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(osd); OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl); assertEquals(streamId, outputStreamImpl.getStreamId()); + assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId)); assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); } @Test - public void testGetOutputStreamWithDefaultKeyValueSerde() { + public void testGetOutputStreamWithKeyValueSerde() { String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); KVSerde mockKVSerde = mock(KVSerde.class); Serde mockKeySerde = mock(Serde.class); Serde mockValueSerde = mock(Serde.class); doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - graphSpec.setDefaultSerde(mockKVSerde); - - OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(streamId); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockKVSerde); + OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(osd); OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl); assertEquals(streamId, outputStreamImpl.getStreamId()); + assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId)); assertEquals(mockKeySerde, outputStreamImpl.getKeySerde()); assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); } - @Test - public void testGetOutputStreamWithDefaultDefaultSerde() { + @Test(expected = IllegalArgumentException.class) + public void testGetOutputStreamWithNullSerde() { String streamId = "test-stream-1"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - - OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream(streamId); - - OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; - assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl); - assertEquals(streamId, outputStreamImpl.getStreamId()); - assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); - assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null); + graphSpec.getOutputStream(osd); } @Test(expected = IllegalStateException.class) - public void testSetDefaultSerdeAfterGettingStreams() { - String streamId = "test-stream-1"; - + public void testSetDefaultSystemDescriptorAfterGettingInputStream() { StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.getInputStream(streamId); - graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception + GenericInputDescriptor id = new GenericSystemDescriptor("system", "factory.class.name") + .getInputDescriptor("input-stream", mock(Serde.class)); + graphSpec.getInputStream(id); + graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception } @Test(expected = IllegalStateException.class) - public void testSetDefaultSerdeAfterGettingOutputStream() { - String streamId = "test-stream-1"; + public void testSetDefaultSystemDescriptorAfterGettingOutputStream() { StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.getOutputStream(streamId); - graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception + GenericOutputDescriptor od = new GenericSystemDescriptor("system", "factory.class.name") + .getOutputDescriptor("output-stream", mock(Serde.class)); + graphSpec.getOutputStream(od); + graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception } @Test(expected = IllegalStateException.class) public void testSetDefaultSerdeAfterGettingIntermediateStream() { String streamId = "test-stream-1"; StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.getIntermediateStream(streamId, null); - graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception + graphSpec.getIntermediateStream(streamId, mock(Serde.class), false); + graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", "mockFactory")); // should throw exception } @Test(expected = IllegalStateException.class) public void testGetSameOutputStreamTwice() { String streamId = "test-stream-1"; StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.getOutputStream(streamId); - graphSpec.getOutputStream(streamId); // should throw exception + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, mock(Serde.class)); + GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, mock(Serde.class)); + graphSpec.getOutputStream(osd1); + graphSpec.getOutputStream(osd2); // should throw exception } @Test @@ -328,15 +316,15 @@ public class TestStreamGraphSpec { Serde mockValueSerde = mock(Serde.class); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graphSpec.getIntermediateStream(streamId, mockValueSerde); + graphSpec.getIntermediateStream(streamId, mockValueSerde, false); assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec()); assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream()); assertEquals(streamId, intermediateStreamImpl.getStreamId()); assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); + assertTrue(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); } @Test @@ -350,84 +338,40 @@ public class TestStreamGraphSpec { doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graphSpec.getIntermediateStream(streamId, mockKVSerde); - - assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream()); - assertEquals(streamId, intermediateStreamImpl.getStreamId()); - assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde()); - assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); - assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test - public void testGetIntermediateStreamWithDefaultValueSerde() { - String streamId = "streamId"; - StreamGraphSpec graph = new StreamGraphSpec(mock(Config.class)); - - Serde mockValueSerde = mock(Serde.class); - graph.setDefaultSerde(mockValueSerde); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream(streamId, null); - - assertEquals(graph.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec()); - assertEquals(graph.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream()); - assertEquals(streamId, intermediateStreamImpl.getStreamId()); - assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); - assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); - } - - @Test - public void testGetIntermediateStreamWithDefaultKeyValueSerde() { - Config mockConfig = mock(Config.class); - String streamId = "streamId"; - - StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - - KVSerde mockKVSerde = mock(KVSerde.class); - Serde mockKeySerde = mock(Serde.class); - Serde mockValueSerde = mock(Serde.class); - doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); - doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); - graphSpec.setDefaultSerde(mockKVSerde); - IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graphSpec.getIntermediateStream(streamId, null); + graphSpec.getIntermediateStream(streamId, mockKVSerde, false); assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec()); assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream()); assertEquals(streamId, intermediateStreamImpl.getStreamId()); assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde()); assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); - assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); - assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); + assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); + assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); } @Test - public void testGetIntermediateStreamWithDefaultDefaultSerde() { + public void testGetIntermediateStreamWithNoSerde() { Config mockConfig = mock(Config.class); String streamId = "streamId"; StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = - graphSpec.getIntermediateStream(streamId, null); + graphSpec.getIntermediateStream(streamId, null, false); assertEquals(graphSpec.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec()); assertEquals(graphSpec.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream()); assertEquals(streamId, intermediateStreamImpl.getStreamId()); - assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); - assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde); - assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); - assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde); + assertNull(intermediateStreamImpl.getOutputStream().getKeySerde()); + assertNull(intermediateStreamImpl.getOutputStream().getValueSerde()); + assertNull(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); + assertNull(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); } @Test(expected = IllegalStateException.class) public void testGetSameIntermediateStreamTwice() { StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class)); - graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class)); + graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false); + graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false); } @Test @@ -454,7 +398,7 @@ public class TestStreamGraphSpec { } @Test - public void testUserDefinedIdValidation() { + public void testIdValidation() { Config mockConfig = mock(Config.class); when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName"); when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234"); @@ -471,7 +415,7 @@ public class TestStreamGraphSpec { fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID."); } - List<String> validOpIds = ImmutableList.of("op.id", "op_id", "op-id", "1000", "op_1", "OP_ID"); + List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", "op_1", "OP_ID"); for (String validOpId: validOpIds) { try { graphSpec.getNextOpId(OpCode.FILTER, validOpId); @@ -498,10 +442,10 @@ public class TestStreamGraphSpec { String testStreamId1 = "test-stream-1"; String testStreamId2 = "test-stream-2"; String testStreamId3 = "test-stream-3"; - - graphSpec.getInputStream("test-stream-1"); - graphSpec.getInputStream("test-stream-2"); - graphSpec.getInputStream("test-stream-3"); + GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass"); + graphSpec.getInputStream(sd.getInputDescriptor(testStreamId1, mock(Serde.class))); + graphSpec.getInputStream(sd.getInputDescriptor(testStreamId2, mock(Serde.class))); + graphSpec.getInputStream(sd.getInputDescriptor(testStreamId3, mock(Serde.class))); List<InputOperatorSpec> inputSpecs = new ArrayList<>(graphSpec.getInputOperators().values()); assertEquals(inputSpecs.size(), 3); @@ -531,4 +475,32 @@ public class TestStreamGraphSpec { when(mockTableDescriptor.getTableId()).thenReturn("my.table"); graphSpec.getTable(mockTableDescriptor); } + + class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> { + public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) { + super(systemName, "factory.class", null, expander); + } + + @Override + public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) { + return new MockInputDescriptor<>(streamId, this, serde); + } + } + + class MockTransformingSystemDescriptor extends SystemDescriptor<MockTransformingSystemDescriptor> implements TransformingInputDescriptorProvider<Integer> { + public MockTransformingSystemDescriptor(String systemName, InputTransformer transformer) { + super(systemName, "factory.class", transformer, null); + } + + @Override + public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) { + return new MockInputDescriptor<>(streamId, this, serde); + } + } + + public class MockInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> { + MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) { + super(streamId, serde, systemDescriptor, null); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java new file mode 100644 index 0000000..55a708b --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import java.util.Collection; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class TestInputOperatorImpl { + @Test + public void testWithKeyedInput() { + InputOperatorImpl inputOperator = + new InputOperatorImpl(new InputOperatorSpec("stream-id", null, null, null, true, "input-op-id")); + + IncomingMessageEnvelope ime = + new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", "key", "msg"); + + Collection<Object> results = + inputOperator.handleMessage(ime, mock(MessageCollector.class), mock(TaskCoordinator.class)); + + Object result = results.iterator().next(); + assertEquals("key", ((KV) result).getKey()); + assertEquals("msg", ((KV) result).getValue()); + } + + @Test + public void testWithUnkeyedInput() { + InputOperatorImpl inputOperator = + new InputOperatorImpl(new InputOperatorSpec("stream-id", null, null, null, false, "input-op-id")); + + IncomingMessageEnvelope ime = + new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", "key", "msg"); + + Collection<Object> results = + inputOperator.handleMessage(ime, mock(MessageCollector.class), mock(TaskCoordinator.class)); + + Object result = results.iterator().next(); + assertEquals("msg", result); + } + + @Test + public void testWithInputTransformer() { + InputOperatorSpec inputOpSpec = + new InputOperatorSpec("stream-id", null, null, IncomingMessageEnvelope::getOffset, true, "input-op-id"); + InputOperatorImpl inputOperator = new InputOperatorImpl(inputOpSpec); + + IncomingMessageEnvelope ime = + new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", "key", "msg"); + + Collection<Object> results = + inputOperator.handleMessage(ime, mock(MessageCollector.class), mock(TaskCoordinator.class)); + + Object result = results.iterator().next(); + assertEquals("123", result); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 4e77fae..604c72d 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -50,6 +50,9 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.ClosableFunction; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.InitableFunction; @@ -59,10 +62,10 @@ import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; @@ -242,9 +245,11 @@ public class TestOperatorImplGraph { Config config = new MapConfig(configs); StreamGraphSpec graphSpec = new StreamGraphSpec(config); - - MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId); - OutputStream<Object> outputStream = graphSpec.getOutputStream(outputStreamId); + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); + GenericOutputDescriptor outputDescriptor = sd.getOutputDescriptor(outputStreamId, mock(Serde.class)); + MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor); + OutputStream<Object> outputStream = graphSpec.getOutputStream(outputDescriptor); inputStream .filter(mock(FilterFunction.class)) @@ -293,9 +298,13 @@ public class TestOperatorImplGraph { Config config = new MapConfig(configs); StreamGraphSpec graphSpec = new StreamGraphSpec(config); - MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId); - OutputStream<KV<Integer, String>> outputStream = graphSpec - .getOutputStream(outputStreamId, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); + GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor = isd.getInputDescriptor(inputStreamId, mock(Serde.class)); + GenericOutputDescriptor outputDescriptor = osd.getOutputDescriptor(outputStreamId, + KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); + MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor); + OutputStream<KV<Integer, String>> outputStream = graphSpec.getOutputStream(outputDescriptor); inputStream .partitionBy(Object::hashCode, Object::toString, @@ -337,12 +346,16 @@ public class TestOperatorImplGraph { @Test public void testBroadcastChain() { String inputStreamId = "input"; + String inputSystem = "input-system"; + String inputPhysicalName = "input-stream"; HashMap<String, String> configMap = new HashMap<>(); - StreamTestUtils.addStreamConfigs(configMap, "input", "input-system", "input-stream"); + StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, inputPhysicalName); Config config = new MapConfig(configMap); StreamGraphSpec graphSpec = new StreamGraphSpec(config); - MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId); + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); + MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor); inputStream.filter(mock(FilterFunction.class)); inputStream.map(mock(MapFunction.class)); @@ -351,7 +364,7 @@ public class TestOperatorImplGraph { OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); - InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream")); + InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName)); assertEquals(2, inputOpImpl.registeredOperators.size()); assertTrue(inputOpImpl.registeredOperators.stream() .anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER)); @@ -362,9 +375,16 @@ public class TestOperatorImplGraph { @Test public void testMergeChain() { String inputStreamId = "input"; - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); + String inputSystem = "input-system"; + String inputPhysicalName = "input-stream"; + HashMap<String, String> configs = new HashMap<>(); + StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName); + Config config = new MapConfig(configs); + StreamGraphSpec graphSpec = new StreamGraphSpec(config); - MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId); + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class)); + MessageStream<Object> inputStream = graphSpec.getInputStream(inputDescriptor); MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class)); MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class)); MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2)); @@ -396,12 +416,14 @@ public class TestOperatorImplGraph { public void testJoinChain() { String inputStreamId1 = "input1"; String inputStreamId2 = "input2"; - + String inputSystem = "input-system"; + String inputPhysicalName1 = "input-stream1"; + String inputPhysicalName2 = "input-stream2"; HashMap<String, String> configs = new HashMap<>(); configs.put(JobConfig.JOB_NAME(), "jobName"); configs.put(JobConfig.JOB_ID(), "jobId"); - StreamTestUtils.addStreamConfigs(configs, "input1", "input-system", "input-stream1"); - StreamTestUtils.addStreamConfigs(configs, "input2", "input-system", "input-stream2"); + StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, inputPhysicalName1); + StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, inputPhysicalName2); Config config = new MapConfig(configs); StreamGraphSpec graphSpec = new StreamGraphSpec(config); @@ -409,8 +431,13 @@ public class TestOperatorImplGraph { Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey; JoinFunction testJoinFunction = new TestJoinFunction("jobName-jobId-join-j1", (BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn); - MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputStreamId1, new NoOpSerde<>()); - MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputStreamId2, new NoOpSerde<>()); + + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class)); + GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class)); + MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputDescriptor1); + MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputDescriptor2); + inputStream1.join(inputStream2, testJoinFunction, mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1"); @@ -428,8 +455,8 @@ public class TestOperatorImplGraph { // verify that join function is initialized once. assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1").numInitCalled, 1); - InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1")); - InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2")); + InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName1)); + InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName2)); PartialJoinOperatorImpl leftPartialJoinOpImpl = (PartialJoinOperatorImpl) inputOpImpl1.registeredOperators.iterator().next(); PartialJoinOperatorImpl rightPartialJoinOpImpl = @@ -442,12 +469,14 @@ public class TestOperatorImplGraph { Object mockLeftMessage = mock(Object.class); long currentTimeMillis = System.currentTimeMillis(); when(mockLeftStore.get(eq(joinKey))).thenReturn(new TimestampedValue<>(mockLeftMessage, currentTimeMillis)); - inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class)); + IncomingMessageEnvelope leftMessage = new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "", "", mockLeftMessage); + inputOpImpl1.onMessage(leftMessage, mock(MessageCollector.class), mock(TaskCoordinator.class)); // verify that right partial join operator calls getSecondKey Object mockRightMessage = mock(Object.class); when(mockRightStore.get(eq(joinKey))).thenReturn(new TimestampedValue<>(mockRightMessage, currentTimeMillis)); - inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class)); + IncomingMessageEnvelope rightMessage = new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "", "", mockRightMessage); + inputOpImpl2.onMessage(rightMessage, mock(MessageCollector.class), mock(TaskCoordinator.class)); // verify that the join function apply is called with the correct messages on match @@ -461,6 +490,7 @@ public class TestOperatorImplGraph { public void testOperatorGraphInitAndClose() { String inputStreamId1 = "input1"; String inputStreamId2 = "input2"; + String inputSystem = "input-system"; Config mockConfig = mock(Config.class); TaskName mockTaskName = mock(TaskName.class); TaskContextImpl mockContext = mock(TaskContextImpl.class); @@ -468,8 +498,11 @@ public class TestOperatorImplGraph { when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputStreamId1); - MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputStreamId2); + GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class)); + GenericInputDescriptor inputDescriptor2 = sd.getInputDescriptor(inputStreamId2, mock(Serde.class)); + MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputDescriptor1); + MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputDescriptor2); Function mapFn = (Function & Serializable) m -> m; inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn)) @@ -560,15 +593,22 @@ public class TestOperatorImplGraph { Config config = new MapConfig(configs); StreamGraphSpec graphSpec = new StreamGraphSpec(config); - MessageStream messageStream1 = graphSpec.getInputStream(inputStreamId1).map(m -> m); - MessageStream messageStream2 = graphSpec.getInputStream(inputStreamId2).filter(m -> true); + GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass"); + GenericInputDescriptor inputDescriptor1 = isd.getInputDescriptor(inputStreamId1, mock(Serde.class)); + GenericInputDescriptor inputDescriptor2 = isd.getInputDescriptor(inputStreamId2, mock(Serde.class)); + GenericInputDescriptor inputDescriptor3 = isd.getInputDescriptor(inputStreamId3, mock(Serde.class)); + GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, "mockFactoryClass"); + GenericOutputDescriptor outputDescriptor1 = osd.getOutputDescriptor(outputStreamId1, mock(Serde.class)); + GenericOutputDescriptor outputDescriptor2 = osd.getOutputDescriptor(outputStreamId2, mock(Serde.class)); + MessageStream messageStream1 = graphSpec.getInputStream(inputDescriptor1).map(m -> m); + MessageStream messageStream2 = graphSpec.getInputStream(inputDescriptor2).filter(m -> true); MessageStream messageStream3 = - graphSpec.getInputStream(inputStreamId3) + graphSpec.getInputStream(inputDescriptor3) .filter(m -> true) .partitionBy(m -> "m", m -> m, "p1") .map(m -> m); - OutputStream<Object> outputStream1 = graphSpec.getOutputStream(outputStreamId1); - OutputStream<Object> outputStream2 = graphSpec.getOutputStream(outputStreamId2); + OutputStream<Object> outputStream1 = graphSpec.getOutputStream(outputDescriptor1); + OutputStream<Object> outputStream2 = graphSpec.getOutputStream(outputDescriptor2); messageStream1 .join(messageStream2, mock(JoinFunction.class), http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java index 0ef6680..6c34fcd 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java @@ -32,6 +32,8 @@ import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.impl.store.TestInMemoryStore; import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; @@ -547,9 +549,10 @@ public class TestWindowOperator { private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode mode, Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException { StreamGraphSpec graph = new StreamGraphSpec(config); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - graph.getInputStream("integers", kvSerde) + GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); + graph.getInputStream(inputDescriptor) .window(Windows.keyedTumblingWindow(KV::getKey, duration, new IntegerSerde(), kvSerde) .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1") .sink((message, messageCollector, taskCoordinator) -> { @@ -563,9 +566,10 @@ public class TestWindowOperator { private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode, Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException { StreamGraphSpec graph = new StreamGraphSpec(config); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - graph.getInputStream("integers", kvSerde) + GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); + graph.getInputStream(inputDescriptor) .window(Windows.tumblingWindow(duration, kvSerde).setEarlyTrigger(earlyTrigger) .setAccumulationMode(mode), "w1") .sink((message, messageCollector, taskCoordinator) -> { @@ -577,9 +581,10 @@ public class TestWindowOperator { private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) throws IOException { StreamGraphSpec graph = new StreamGraphSpec(config); - KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); - graph.getInputStream("integers", kvSerde) + GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); + graph.getInputStream(inputDescriptor) .window(Windows.keyedSessionWindow(KV::getKey, duration, new IntegerSerde(), kvSerde) .setAccumulationMode(mode), "w1") .sink((message, messageCollector, taskCoordinator) -> { @@ -592,9 +597,10 @@ public class TestWindowOperator { private StreamGraphSpec getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) throws IOException { StreamGraphSpec graph = new StreamGraphSpec(config); - - MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers", - KVSerde.of(new IntegerSerde(), new IntegerSerde())); + KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); + GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", "mockFactoryClass"); + GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = sd.getInputDescriptor("integers", kvSerde); + MessageStream<KV<Integer, Integer>> integers = graph.getInputStream(inputDescriptor); integers .map(new KVMapFunction()) http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java index b27c944..a9ccd12 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java @@ -228,9 +228,9 @@ public class TestOperatorSpec { } }; - InputOperatorSpec<String, Object> inputOperatorSpec = new InputOperatorSpec<>( - "mockStreamId", new StringSerde("UTF-8"), objSerde, true, "op0"); - InputOperatorSpec<String, Object> inputOpCopy = (InputOperatorSpec<String, Object>) OperatorSpecTestUtils.copyOpSpec(inputOperatorSpec); + InputOperatorSpec inputOperatorSpec = new InputOperatorSpec( + "mockStreamId", new StringSerde("UTF-8"), objSerde, null, true, "op0"); + InputOperatorSpec inputOpCopy = (InputOperatorSpec) OperatorSpecTestUtils.copyOpSpec(inputOperatorSpec); assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", inputOperatorSpec, inputOpCopy); assertTrue(inputOperatorSpec.isClone(inputOpCopy)); @@ -271,12 +271,10 @@ public class TestOperatorSpec { @Test public void testJoinOperatorSpec() { - InputOperatorSpec<TestMessageEnvelope, Object> leftOpSpec = new InputOperatorSpec<>( - "test-input-1", new NoOpSerde<>(), - new NoOpSerde<>(), false, "op0"); - InputOperatorSpec<TestMessageEnvelope, Object> rightOpSpec = new InputOperatorSpec<>( - "test-input-2", new NoOpSerde<>(), - new NoOpSerde<>(), false, "op1"); + InputOperatorSpec leftOpSpec = new InputOperatorSpec( + "test-input-1", new NoOpSerde<>(), new NoOpSerde<>(), null, false, "op0"); + InputOperatorSpec rightOpSpec = new InputOperatorSpec( + "test-input-2", new NoOpSerde<>(), new NoOpSerde<>(), null, false, "op1"); Serde<Object> objSerde = new Serde<Object>() { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java index 9bbcbfa..86a553f 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java @@ -26,16 +26,23 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.TimerRegistry; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; import org.junit.Before; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** @@ -44,10 +51,12 @@ import static org.mockito.Mockito.*; public class TestPartitionByOperatorSpec { private final Config mockConfig = mock(Config.class); - private final String testInputId = "test-input-1"; + private final GenericInputDescriptor testinputDescriptor = + new GenericSystemDescriptor("mockSystem", "mockFactoryClassName") + .getInputDescriptor("test-input-1", mock(Serde.class)); private final String testJobName = "testJob"; private final String testJobId = "1"; - private final String testReparStreamName = "parByKey"; + private final String testRepartitionedStreamName = "parByKey"; private StreamGraphSpec graphSpec = null; class TimerMapFn implements MapFunction<Object, String>, TimerFunction<String, Object> { @@ -95,13 +104,14 @@ public class TestPartitionByOperatorSpec { @Test public void testPartitionBy() { - MessageStream inputStream = graphSpec.getInputStream(testInputId); + MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); MapFunction<Object, String> keyFn = m -> m.toString(); MapFunction<Object, Object> valueFn = m -> m; + KVSerde<Object, Object> partitionBySerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); MessageStream<KV<String, Object>> - reparStream = inputStream.partitionBy(keyFn, valueFn, testReparStreamName); + reparStream = inputStream.partitionBy(keyFn, valueFn, partitionBySerde, testRepartitionedStreamName); InputOperatorSpec inputOpSpec = (InputOperatorSpec) Whitebox.getInternalState(reparStream, "operatorSpec"); - assertEquals(inputOpSpec.getStreamId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName)); + assertEquals(inputOpSpec.getStreamId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde); assertTrue(inputOpSpec.isKeyed()); @@ -110,7 +120,32 @@ public class TestPartitionByOperatorSpec { InputOperatorSpec originInputSpec = (InputOperatorSpec) Whitebox.getInternalState(inputStream, "operatorSpec"); assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec); PartitionByOperatorSpec reparOpSpec = (PartitionByOperatorSpec) originInputSpec.getRegisteredOperatorSpecs().toArray()[0]; - assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName)); + assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); + assertEquals(reparOpSpec.getKeyFunction(), keyFn); + assertEquals(reparOpSpec.getValueFunction(), valueFn); + assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId()); + assertNull(reparOpSpec.getTimerFn()); + assertNull(reparOpSpec.getWatermarkFn()); + } + + @Test + public void testPartitionByWithNoSerde() { + MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); + MapFunction<Object, String> keyFn = m -> m.toString(); + MapFunction<Object, Object> valueFn = m -> m; + MessageStream<KV<String, Object>> + reparStream = inputStream.partitionBy(keyFn, valueFn, testRepartitionedStreamName); + InputOperatorSpec inputOpSpec = (InputOperatorSpec) Whitebox.getInternalState(reparStream, "operatorSpec"); + assertEquals(inputOpSpec.getStreamId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); + assertNull(inputOpSpec.getKeySerde()); + assertNull(inputOpSpec.getValueSerde()); + assertTrue(inputOpSpec.isKeyed()); + assertNull(inputOpSpec.getTimerFn()); + assertNull(inputOpSpec.getWatermarkFn()); + InputOperatorSpec originInputSpec = (InputOperatorSpec) Whitebox.getInternalState(inputStream, "operatorSpec"); + assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec); + PartitionByOperatorSpec reparOpSpec = (PartitionByOperatorSpec) originInputSpec.getRegisteredOperatorSpecs().toArray()[0]; + assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)); assertEquals(reparOpSpec.getKeyFunction(), keyFn); assertEquals(reparOpSpec.getValueFunction(), valueFn); assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId()); @@ -120,8 +155,8 @@ public class TestPartitionByOperatorSpec { @Test public void testCopy() { - MessageStream inputStream = graphSpec.getInputStream(testInputId); - inputStream.partitionBy(m -> m.toString(), m -> m, testReparStreamName); + MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); + inputStream.partitionBy(m -> m.toString(), m -> m, testRepartitionedStreamName); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); OperatorSpecGraph clonedGraph = specGraph.clone(); OperatorSpecTestUtils.assertClonedGraph(specGraph, clonedGraph); @@ -130,28 +165,28 @@ public class TestPartitionByOperatorSpec { @Test(expected = IllegalArgumentException.class) public void testTimerFunctionAsKeyFn() { TimerMapFn keyFn = new TimerMapFn(); - MessageStream<Object> inputStream = graphSpec.getInputStream(testInputId); + MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); inputStream.partitionBy(keyFn, m -> m, "parByKey"); } @Test(expected = IllegalArgumentException.class) public void testWatermarkFunctionAsKeyFn() { WatermarkMapFn keyFn = new WatermarkMapFn(); - MessageStream<Object> inputStream = graphSpec.getInputStream(testInputId); + MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); inputStream.partitionBy(keyFn, m -> m, "parByKey"); } @Test(expected = IllegalArgumentException.class) public void testTimerFunctionAsValueFn() { TimerMapFn valueFn = new TimerMapFn(); - MessageStream<Object> inputStream = graphSpec.getInputStream(testInputId); + MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey"); } @Test(expected = IllegalArgumentException.class) public void testWatermarkFunctionAsValueFn() { WatermarkMapFn valueFn = new WatermarkMapFn(); - MessageStream<Object> inputStream = graphSpec.getInputStream(testInputId); + MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor); inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index f0b8a17..ba3b5df 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -22,8 +22,6 @@ package org.apache.samza.util import org.junit.Assert._ import org.junit.Test import org.apache.samza.config.MapConfig -import org.apache.samza.serializers._ -import org.apache.samza.SamzaException class TestUtil { @Test