http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 2c8f682..8c75bca 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -24,6 +24,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; +import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphImpl; @@ -45,8 +46,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; -import java.util.function.Function; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -122,11 +121,11 @@ public class TestExecutionPlanner { * */ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - Function mockFn = mock(Function.class); - OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn); - BiFunction mockBuilder = mock(BiFunction.class); - streamGraph.getInputStream("input1", mockBuilder) - .partitionBy(m -> "yes!!!").map(m -> m) + MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1"); + OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); + input1 + .partitionBy(m -> m.key, m -> m.value) + .map(kv -> kv) .sendTo(output1); return streamGraph; } @@ -145,13 +144,20 @@ public class TestExecutionPlanner { */ StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - BiFunction msgBuilder = mock(BiFunction.class); - MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m); - MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true); - MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); - Function mockFn = mock(Function.class); - OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn); - OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn); + MessageStream<KV<Object, Object>> m1 = + streamGraph.<KV<Object, Object>>getInputStream("input1") + .map(m -> m); + MessageStream<KV<Object, Object>> m2 = + streamGraph.<KV<Object, Object>>getInputStream("input2") + .partitionBy(m -> m.key, m -> m.value) + .filter(m -> true); + MessageStream<KV<Object, Object>> m3 = + streamGraph.<KV<Object, Object>>getInputStream("input3") + .filter(m -> true) + .partitionBy(m -> m.key, m -> m.value) + .map(m -> m); + OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); + OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2"); m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1); m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2); @@ -162,21 +168,28 @@ public class TestExecutionPlanner { private StreamGraphImpl createStreamGraphWithJoinAndWindow() { StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - BiFunction msgBuilder = mock(BiFunction.class); - MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m); - MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true); - MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); - Function mockFn = mock(Function.class); - OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn); - OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn); + MessageStream<KV<Object, Object>> m1 = + streamGraph.<KV<Object, Object>>getInputStream("input1") + .map(m -> m); + MessageStream<KV<Object, Object>> m2 = + streamGraph.<KV<Object, Object>>getInputStream("input2") + .partitionBy(m -> m.key, m -> m.value) + .filter(m -> true); + MessageStream<KV<Object, Object>> m3 = + streamGraph.<KV<Object, Object>>getInputStream("input3") + .filter(m -> true) + .partitionBy(m -> m.key, m -> m.value) + .map(m -> m); + OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); + OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2"); m1.map(m -> m) .filter(m->true) - .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8))); + .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8))); m2.map(m -> m) .filter(m->true) - .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16))); + .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16))); m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1); m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index 4bda86b..095e407 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -19,25 +19,26 @@ package org.apache.samza.execution; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.function.BiFunction; -import java.util.function.Function; - import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.codehaus.jackson.map.ObjectMapper; import org.junit.Test; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -105,13 +106,21 @@ public class TestJobGraphJsonGenerator { StreamManager streamManager = new StreamManager(systemAdmins); StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - BiFunction mockBuilder = mock(BiFunction.class); - MessageStream m1 = streamGraph.getInputStream("input1", mockBuilder).map(m -> m); - MessageStream m2 = streamGraph.getInputStream("input2", mockBuilder).partitionBy(m -> "haha").filter(m -> true); - MessageStream m3 = streamGraph.getInputStream("input3", mockBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); - Function mockFn = mock(Function.class); - OutputStream<Object, Object, Object> outputStream1 = streamGraph.getOutputStream("output1", mockFn, mockFn); - OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn); + streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + MessageStream<KV<Object, Object>> m1 = + streamGraph.<KV<Object, Object>>getInputStream("input1") + .map(m -> m); + MessageStream<KV<Object, Object>> m2 = + streamGraph.<KV<Object, Object>>getInputStream("input2") + .partitionBy(m -> m.key, m -> m.value) + .filter(m -> true); + MessageStream<KV<Object, Object>> m3 = + streamGraph.<KV<Object, Object>>getInputStream("input3") + .filter(m -> true) + .partitionBy(m -> m.key, m -> m.value) + .map(m -> m); + OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1"); + OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2"); m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1); m2.sink((message, collector, coordinator) -> { }); http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java new file mode 100644 index 0000000..c59c0cc --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.execution; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.SerializerConfig; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerializableSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.StreamSpec; +import org.junit.Test; + +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class TestJobNode { + + @Test + public void testAddSerdeConfigs() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec inputSpec = new StreamSpec("input", "input", "input-system"); + StreamSpec outputSpec = new StreamSpec("output", "output", "output-system"); + StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system"); + doReturn(inputSpec).when(mockRunner).getStreamSpec("input"); + doReturn(outputSpec).when(mockRunner).getStreamSpec("output"); + doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1"); + + StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); + streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); + MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input"); + OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output"); + input.partitionBy(KV::getKey, KV::getValue).sendTo(output); + + JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class)); + StreamEdge inputEdge = new StreamEdge(inputSpec); + StreamEdge outputEdge = new StreamEdge(outputSpec); + StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true); + jobNode.addInEdge(inputEdge); + jobNode.addOutEdge(outputEdge); + jobNode.addInEdge(repartitionEdge); + jobNode.addOutEdge(repartitionEdge); + + Map<String, String> configs = new HashMap<>(); + jobNode.addSerdeConfigs(configs); + + MapConfig mapConfig = new MapConfig(configs); + Config serializers = mapConfig.subset("serializers.registry.", true); + + // make sure that the serializers deserialize correctly + SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); + Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap( + e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""), + e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes())) + )); + assertEquals(2, serializers.size()); + + String inputKeySerde = mapConfig.get("streams.input.samza.key.serde"); + String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde"); + assertTrue(deserializedSerdes.containsKey(inputKeySerde)); + assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue(deserializedSerdes.containsKey(inputMsgSerde)); + assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + + String outputKeySerde = mapConfig.get("streams.output.samza.key.serde"); + String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde"); + assertTrue(deserializedSerdes.containsKey(outputKeySerde)); + assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue(deserializedSerdes.containsKey(outputMsgSerde)); + assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + + String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde"); + String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde"); + assertTrue(deserializedSerdes.containsKey(partitionByKeySerde)); + assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde)); + assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index c51b1ea..004c5cf 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -25,6 +25,8 @@ import org.apache.samza.config.Config; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.StreamSpec; @@ -276,10 +278,11 @@ public class TestJoinOperator { @Override public void init(StreamGraph graph, Config config) { + KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); MessageStream<FirstStreamIME> inStream = - graph.getInputStream("instream", FirstStreamIME::new); + graph.getInputStream("instream", kvSerde).map(FirstStreamIME::new); MessageStream<SecondStreamIME> inStream2 = - graph.getInputStream("instream2", SecondStreamIME::new); + graph.getInputStream("instream2", kvSerde).map(SecondStreamIME::new); SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream"); inStream @@ -330,14 +333,24 @@ public class TestJoinOperator { } private static class FirstStreamIME extends IncomingMessageEnvelope { + FirstStreamIME(KV<Integer, Integer> message) { + super(new SystemStreamPartition( + "insystem", "instream", new Partition(0)), "1", message.getKey(), message.getValue()); + } + FirstStreamIME(Integer key, Integer message) { - super(new SystemStreamPartition("insystem", "instream", new Partition(0)), "1", key, message); + this(KV.of(key, message)); } } private static class SecondStreamIME extends IncomingMessageEnvelope { + SecondStreamIME(KV<Integer, Integer> message) { + super(new SystemStreamPartition( + "insystem2", "instream2", new Partition(0)), "1", message.getKey(), message.getValue()); + } + SecondStreamIME(Integer key, Integer message) { - super(new SystemStreamPartition("insystem2", "instream2", new Partition(0)), "1", key, message); + this(KV.of(key, message)); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java index 61224f2..c6554bc 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java @@ -32,6 +32,7 @@ import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.operators.spec.PartitionByOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; @@ -39,20 +40,19 @@ import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; import org.apache.samza.operators.windows.Window; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.KVSerde; import org.junit.Test; import org.mockito.ArgumentCaptor; import java.time.Duration; import java.util.Collection; import java.util.Collections; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -172,9 +172,8 @@ public class TestMessageStreamImpl { StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); - - OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class); - inputStream.sendTo(mockOutputOpSpec); + OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class); + inputStream.sendTo(mockOutputStreamImpl); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture()); @@ -182,33 +181,75 @@ public class TestMessageStreamImpl { assertTrue(registeredOpSpec instanceof OutputOperatorSpec); assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode()); - assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream()); + assertEquals(mockOutputStreamImpl, ((OutputOperatorSpec) registeredOpSpec).getOutputStream()); + + // same behavior as above so nothing new to assert. but ensures that this variant compiles. + MessageStreamImpl<KV<String, TestMessageEnvelope>> keyedInputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); + OutputStreamImpl<KV<String, TestMessageEnvelope>> mockKeyedOutputStreamImpl = mock(OutputStreamImpl.class); + keyedInputStream.sendTo(mockKeyedOutputStreamImpl); + + // can't unit test it, but the following variants should not compile +// inputStream.sendTo(mockKeyedOutputStreamImpl); +// keyedInputStream.sendTo(mockOutputStreamImpl); } @Test - public void testPartitionBy() { + public void testRepartition() { StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); OperatorSpec mockOpSpec = mock(OperatorSpec.class); String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0); - Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class); - OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class); + OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class); + KVSerde mockKVSerde = mock(KVSerde.class); IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class); - when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class))) + when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKVSerde))) .thenReturn(mockIntermediateStream); when(mockIntermediateStream.getOutputStream()) - .thenReturn(mockOutputOpSpec); + .thenReturn(mockOutputStreamImpl); MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); - inputStream.partitionBy(mockKeyFn); + Function mockKeyFunction = mock(Function.class); + Function mockValueFunction = mock(Function.class); + inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde); ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture()); OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue(); - assertTrue(registeredOpSpec instanceof OutputOperatorSpec); + assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec); + assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode()); + assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream()); + assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction()); + assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction()); + } + + @Test + public void testRepartitionWithoutSerde() { + StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); + OperatorSpec mockOpSpec = mock(OperatorSpec.class); + + String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0); + OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class); + IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class); + when(mockGraph.getIntermediateStream(eq(streamName), eq(null))) + .thenReturn(mockIntermediateStream); + when(mockIntermediateStream.getOutputStream()) + .thenReturn(mockOutputStreamImpl); + + MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec); + Function mockKeyFunction = mock(Function.class); + Function mockValueFunction = mock(Function.class); + inputStream.partitionBy(mockKeyFunction, mockValueFunction); + + ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class); + verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture()); + OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue(); + + assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec); assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode()); - assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream()); + assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream()); + assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction()); + assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction()); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java index 1fc60bd..45583c2 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java @@ -27,36 +27,136 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; import org.junit.Test; import java.util.ArrayList; import java.util.List; -import java.util.function.BiFunction; -import java.util.function.Function; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestStreamGraphImpl { @Test - public void testGetInputStream() { + public void testGetInputStreamWithValueSerde() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); StreamSpec mockStreamSpec = mock(StreamSpec.class); when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class); - MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder); + Serde mockValueSerde = mock(Serde.class); + MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde); - InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec = + InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); - assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder()); assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); + assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); + } + + @Test + public void testGetInputStreamWithKeyValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + KVSerde mockKVSerde = mock(KVSerde.class); + Serde mockKeySerde = mock(Serde.class); + Serde mockValueSerde = mock(Serde.class); + doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); + doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); + MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde); + + InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = + (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); + assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); + assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); + assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); + assertEquals(mockKeySerde, inputOpSpec.getKeySerde()); + assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); + } + + @Test(expected = NullPointerException.class) + public void testGetInputStreamWithNullSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + graph.getInputStream("test-stream-1", null); + } + + @Test + public void testGetInputStreamWithDefaultValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + Serde mockValueSerde = mock(Serde.class); + graph.setDefaultSerde(mockValueSerde); + MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1"); + + InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = + (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); + assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); + assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); + assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); + assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); + } + + @Test + public void testGetInputStreamWithDefaultKeyValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + KVSerde mockKVSerde = mock(KVSerde.class); + Serde mockKeySerde = mock(Serde.class); + Serde mockValueSerde = mock(Serde.class); + doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); + doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); + graph.setDefaultSerde(mockKVSerde); + MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1"); + + InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = + (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); + assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); + assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); + assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); + assertEquals(mockKeySerde, inputOpSpec.getKeySerde()); + assertEquals(mockValueSerde, inputOpSpec.getValueSerde()); + } + + @Test + public void testGetInputStreamWithDefaultDefaultSerde() { + // default default serde == user hasn't provided a default serde + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1"); + + InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = + (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); + assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); + assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); + assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); + assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); + assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde); } @Test @@ -65,15 +165,13 @@ public class TestStreamGraphImpl { StreamSpec mockStreamSpec = mock(StreamSpec.class); when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class); - MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder); + MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1"); - InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec = + InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec(); assertEquals(OpCode.INPUT, inputOpSpec.getOpCode()); assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec); - assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder()); assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec()); } @@ -86,12 +184,12 @@ public class TestStreamGraphImpl { when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class)); - MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class)); + MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1"); + MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2"); - InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 = + InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 = (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec(); - InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 = + InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 = (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec(); assertEquals(graph.getInputOperators().size(), 2); @@ -105,29 +203,149 @@ public class TestStreamGraphImpl { when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getInputStream("test-stream-1", mock(BiFunction.class)); - graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception + graph.getInputStream("test-stream-1"); + // should throw exception + graph.getInputStream("test-stream-1"); + } + + @Test + public void testGetOutputStreamWithValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + Serde mockValueSerde = mock(Serde.class); + OutputStream<TestMessageEnvelope> outputStream = + graph.getOutputStream("test-stream-1", mockValueSerde); + + OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; + assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); + assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); + assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); + } + + @Test + public void testGetOutputStreamWithKeyValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + KVSerde mockKVSerde = mock(KVSerde.class); + Serde mockKeySerde = mock(Serde.class); + Serde mockValueSerde = mock(Serde.class); + doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); + doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); + graph.setDefaultSerde(mockKVSerde); + OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde); + + OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; + assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); + assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); + assertEquals(mockKeySerde, outputStreamImpl.getKeySerde()); + assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); + } + + @Test(expected = NullPointerException.class) + public void testGetOutputStreamWithNullSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + graph.getOutputStream("test-stream-1", null); + } + + @Test + public void testGetOutputStreamWithDefaultValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + + Serde mockValueSerde = mock(Serde.class); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + graph.setDefaultSerde(mockValueSerde); + OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1"); + + OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; + assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); + assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); + assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); + } + + @Test + public void testGetOutputStreamWithDefaultKeyValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + KVSerde mockKVSerde = mock(KVSerde.class); + Serde mockKeySerde = mock(Serde.class); + Serde mockValueSerde = mock(Serde.class); + doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); + doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); + graph.setDefaultSerde(mockKVSerde); + + OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1"); + + OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; + assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); + assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); + assertEquals(mockKeySerde, outputStreamImpl.getKeySerde()); + assertEquals(mockValueSerde, outputStreamImpl.getValueSerde()); } @Test - public void testGetOutputStream() { + public void testGetOutputStreamWithDefaultDefaultSerde() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); StreamSpec mockStreamSpec = mock(StreamSpec.class); when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + + OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1"); + + OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream; + assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl); + assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec()); + assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde); + assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde); + } + + @Test(expected = IllegalStateException.class) + public void testSetDefaultSerdeAfterGettingStreams() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + graph.getInputStream("test-stream-1"); + graph.setDefaultSerde(mock(Serde.class)); // should throw exception + } + + @Test(expected = IllegalStateException.class) + public void testSetDefaultSerdeAfterGettingOutputStream() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class); - Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class); + graph.getOutputStream("test-stream-1"); + graph.setDefaultSerde(mock(Serde.class)); // should throw exception + } - OutputStream<String, String, TestMessageEnvelope> outputStream = - graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor); + @Test(expected = IllegalStateException.class) + public void testSetDefaultSerdeAfterGettingIntermediateStream() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); - OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream; - assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec); - assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor()); - assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor()); - assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec()); + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); + graph.getIntermediateStream("test-stream-1", null); + graph.setDefaultSerde(mock(Serde.class)); // should throw exception } @Test(expected = IllegalStateException.class) @@ -136,12 +354,89 @@ public class TestStreamGraphImpl { when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); - graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception + graph.getOutputStream("test-stream-1"); + graph.getOutputStream("test-stream-1"); // should throw exception + } + + @Test + public void testGetIntermediateStreamWithValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); + when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); + when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + + Serde mockValueSerde = mock(Serde.class); + IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = + graph.getIntermediateStream("test-stream-1", mockValueSerde); + + assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); + assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); + assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); + assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); + assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); + } + + @Test + public void testGetIntermediateStreamWithKeyValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); + when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); + when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + + KVSerde mockKVSerde = mock(KVSerde.class); + Serde mockKeySerde = mock(Serde.class); + Serde mockValueSerde = mock(Serde.class); + doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); + doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); + IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = + graph.getIntermediateStream("test-stream-1", mockKVSerde); + + assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); + assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); + assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); + assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde()); + assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); + assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); + assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); + } + + @Test + public void testGetIntermediateStreamWithDefaultValueSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); + when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); + when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + + Serde mockValueSerde = mock(Serde.class); + graph.setDefaultSerde(mockValueSerde); + IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = + graph.getIntermediateStream("test-stream-1", null); + + assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); + assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); + assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); + assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); + assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); + assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); } @Test - public void testGetIntermediateStream() { + public void testGetIntermediateStreamWithDefaultKeyValueSerde() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); Config mockConfig = mock(Config.class); StreamSpec mockStreamSpec = mock(StreamSpec.class); @@ -150,19 +445,45 @@ public class TestStreamGraphImpl { when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); - Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class); - Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class); - BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class); - IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl = - graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder); + KVSerde mockKVSerde = mock(KVSerde.class); + Serde mockKeySerde = mock(Serde.class); + Serde mockValueSerde = mock(Serde.class); + doReturn(mockKeySerde).when(mockKVSerde).getKeySerde(); + doReturn(mockValueSerde).when(mockKVSerde).getValueSerde(); + graph.setDefaultSerde(mockKVSerde); + IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = + graph.getIntermediateStream("test-stream-1", null); + + assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); + assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); + assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); + assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde()); + assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde()); + assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde()); + assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde()); + } + + @Test + public void testGetIntermediateStreamWithDefaultDefaultSerde() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + Config mockConfig = mock(Config.class); + StreamSpec mockStreamSpec = mock(StreamSpec.class); + when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob"); + when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001"); + when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec); + + StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig); + IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl = + graph.getIntermediateStream("test-stream-1", null); assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec()); assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream()); assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec()); - assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor()); - assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor()); - assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder()); + assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde); + assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde); + assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde); + assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde); } @Test(expected = IllegalStateException.class) @@ -171,8 +492,8 @@ public class TestStreamGraphImpl { when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class)); StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class)); - graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class)); - graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class)); + graph.getIntermediateStream("test-stream-1", mock(Serde.class)); + graph.getIntermediateStream("test-stream-1", mock(Serde.class)); } @Test @@ -199,9 +520,9 @@ public class TestStreamGraphImpl { StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system"); when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3); - graph.getInputStream("test-stream-1", (k, v) -> v); - graph.getInputStream("test-stream-2", (k, v) -> v); - graph.getInputStream("test-stream-3", (k, v) -> v); + graph.getInputStream("test-stream-1"); + graph.getInputStream("test-stream-2"); + graph.getInputStream("test-stream-3"); List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values()); Assert.assertEquals(inputSpecs.size(), 3); http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java index ca8a151..ee44cf9 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java @@ -34,6 +34,8 @@ import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.StreamSpec; @@ -389,8 +391,9 @@ public class TestWindowOperator { @Override public void init(StreamGraph graph, Config config) { - MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers", - (k, m) -> new IntegerEnvelope((Integer) k)); + MessageStream<IntegerEnvelope> inStream = + graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) + .map(kv -> new IntegerEnvelope(kv.getKey())); Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream .map(m -> m) @@ -418,8 +421,9 @@ public class TestWindowOperator { @Override public void init(StreamGraph graph, Config config) { - MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers", - (k, m) -> new IntegerEnvelope((Integer) k)); + MessageStream<IntegerEnvelope> inStream = + graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) + .map(kv -> new IntegerEnvelope(kv.getKey())); Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream .map(m -> m) @@ -444,8 +448,9 @@ public class TestWindowOperator { @Override public void init(StreamGraph graph, Config config) { - MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers", - (k, m) -> new IntegerEnvelope((Integer) k)); + MessageStream<IntegerEnvelope> inStream = + graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())) + .map(kv -> new IntegerEnvelope(kv.getKey())); Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey(); inStream http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 4505eef..68b4ce0 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -19,9 +19,9 @@ package org.apache.samza.operators.impl; -import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.config.Config; import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphImpl; @@ -30,6 +30,10 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; @@ -43,8 +47,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.function.BiFunction; -import java.util.function.Function; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; @@ -73,9 +75,8 @@ public class TestOperatorImplGraph { when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class)); StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); - MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class)); - OutputStream<Object, Object, Object> outputStream = - streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class)); + MessageStream<Object> inputStream = streamGraph.getInputStream("input"); + OutputStream<Object> outputStream = streamGraph.getOutputStream("output"); inputStream .filter(mock(FilterFunction.class)) @@ -104,12 +105,49 @@ public class TestOperatorImplGraph { } @Test + public void testPartitionByChain() { + ApplicationRunner mockRunner = mock(ApplicationRunner.class); + when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); + when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system")); + when(mockRunner.getStreamSpec(eq("null-null-partition_by-1"))) + .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system")); + StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); + MessageStream<Object> inputStream = streamGraph.getInputStream("input"); + OutputStream<KV<Integer, String>> outputStream = streamGraph + .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); + + inputStream + .partitionBy(Object::hashCode, Object::toString, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))) + .sendTo(outputStream); + + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + OperatorImplGraph opImplGraph = + new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); + + InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream")); + assertEquals(1, inputOpImpl.registeredOperators.size()); + + OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next(); + assertEquals(0, partitionByOpImpl.registeredOperators.size()); // is terminal but paired with an input operator + assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode()); + + InputOperatorImpl repartitionedInputOpImpl = + opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream")); + assertEquals(1, repartitionedInputOpImpl.registeredOperators.size()); + + OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next(); + assertEquals(0, sendToOpImpl.registeredOperators.size()); + assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode()); + } + + @Test public void testBroadcastChain() { ApplicationRunner mockRunner = mock(ApplicationRunner.class); when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); - MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class)); + MessageStream<Object> inputStream = streamGraph.getInputStream("input"); inputStream.filter(mock(FilterFunction.class)); inputStream.map(mock(MapFunction.class)); @@ -132,7 +170,7 @@ public class TestOperatorImplGraph { when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); - MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class)); + MessageStream<Object> inputStream = streamGraph.getInputStream("input"); MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class)); MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class)); MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2)); @@ -156,8 +194,8 @@ public class TestOperatorImplGraph { StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class)); JoinFunction mockJoinFunction = mock(JoinFunction.class); - MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v); - MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v); + MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>()); + MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>()); inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1)); TaskContext mockTaskContext = mock(TaskContext.class); @@ -182,13 +220,13 @@ public class TestOperatorImplGraph { // verify that left partial join operator calls getFirstKey Object mockLeftMessage = mock(Object.class); when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey); - inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class)); + inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class)); verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage); // verify that right partial join operator calls getSecondKey Object mockRightMessage = mock(Object.class); when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey); - inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class)); + inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class)); verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage); // verify that the join function apply is called with the correct messages on match @@ -205,8 +243,8 @@ public class TestOperatorImplGraph { when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig); - MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v); - MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v); + MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1"); + MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2"); List<String> initializedOperators = new ArrayList<>(); List<String> closedOperators = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java index e183d87..a91c1af 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java @@ -41,7 +41,7 @@ public class TestStreamOperatorImpl { @Test @SuppressWarnings("unchecked") - public void testSimpleOperator() { + public void testStreamOperator() { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class); when(mockOp.getTransformFn()).thenReturn(txfmFn); @@ -61,7 +61,7 @@ public class TestStreamOperatorImpl { } @Test - public void testSimpleOperatorClose() { + public void testStreamOperatorClose() { StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class); FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class); when(mockOp.getTransformFn()).thenReturn(txfmFn); http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala deleted file mode 100644 index eddfb0a..0000000 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.util.Arrays -import org.junit.Assert._ -import org.junit.Test -import java.nio.ByteBuffer - -class TestByteBufferSerde { - @Test - def testSerde { - val serde = new ByteBufferSerde - assertNull(serde.toBytes(null)) - assertNull(serde.fromBytes(null)) - - val bytes = "A lazy way of creating a byte array".getBytes() - val byteBuffer = ByteBuffer.wrap(bytes) - byteBuffer.mark() - assertArrayEquals(serde.toBytes(byteBuffer), bytes) - byteBuffer.reset() - assertEquals(serde.fromBytes(bytes), byteBuffer) - } - - @Test - def testSerializationPreservesInput { - val serde = new ByteBufferSerde - val bytes = "A lazy way of creating a byte array".getBytes() - val byteBuffer = ByteBuffer.wrap(bytes) - byteBuffer.get() // advance position by 1 - serde.toBytes(byteBuffer) - - assertEquals(byteBuffer.capacity(), byteBuffer.limit()) - assertEquals(1, byteBuffer.position()) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala deleted file mode 100644 index f605762..0000000 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.util.Arrays - -import org.junit.Assert._ -import org.junit.Test - -class TestByteSerde { - @Test - def testByteSerde { - val serde = new ByteSerde - assertNull(serde.toBytes(null)) - assertNull(serde.fromBytes(null)) - - val testBytes = "A lazy way of creating a byte array".getBytes() - assertArrayEquals(serde.toBytes(testBytes), testBytes) - assertArrayEquals(serde.fromBytes(testBytes), testBytes) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala deleted file mode 100644 index 60241b5..0000000 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.util.Arrays - -import org.junit.Assert._ -import org.junit.Test - -class TestDoubleSerde { - @Test - def testDoubleSerde { - val serde = new DoubleSerde - assertEquals(null, serde.toBytes(null)) - assertEquals(null, serde.fromBytes(null)) - - val fooBar = 9.156013e-002 - val fooBarBytes = serde.toBytes(fooBar) - fooBarBytes.foreach(System.err.println) - assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), fooBarBytes) - assertEquals(fooBar, serde.fromBytes(fooBarBytes)) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala deleted file mode 100644 index a3e7e1f..0000000 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import org.junit.Assert._ -import org.junit.Test - -class TestIntegerSerde { - @Test - def testIntegerSerde { - val serde = new IntegerSerde - assertEquals(null, serde.toBytes(null)) - assertEquals(null, serde.fromBytes(null)) - - val fooBar = 37 - val fooBarBytes = serde.toBytes(fooBar) - assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes) - assertEquals(fooBar, serde.fromBytes(fooBarBytes)) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala deleted file mode 100644 index 77a7498..0000000 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.util.Arrays - -import org.junit.Assert._ -import org.junit.Test - -class TestLongSerde { - @Test - def testLongSerde { - val serde = new LongSerde - assertEquals(null, serde.toBytes(null)) - assertEquals(null, serde.fromBytes(null)) - - val fooBar = 1234123412341234L - val fooBarBytes = serde.toBytes(fooBar) - fooBarBytes.foreach(System.err.println) - assertArrayEquals(Array[Byte](0, 4, 98, 109, -65, -102, 1, -14), fooBarBytes) - assertEquals(fooBar, serde.fromBytes(fooBarBytes)) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala deleted file mode 100644 index 8e899d0..0000000 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import org.junit.Assert._ -import org.junit.Test - -class TestSerializableSerde { - @Test - def testSerializableSerde { - val serde = new SerializableSerde[String] - assertNull(serde.toBytes(null)) - assertNull(serde.fromBytes(null)) - - val obj = "String is serializable" - - // Serialized string is prefix + string itself - val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte) - val expected = (prefix ++ obj.getBytes("UTF-8")) - - val bytes = serde.toBytes(obj) - - assertArrayEquals(expected, bytes) - - val objRoundTrip:String = serde.fromBytes(bytes) - assertEquals(obj, objRoundTrip) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala deleted file mode 100644 index a1e8e88..0000000 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import org.junit.Assert._ -import org.junit.Test - -class TestStringSerde { - @Test - def testStringSerde { - val serde = new StringSerde("UTF-8") - assertEquals(null, serde.toBytes(null)) - assertEquals(null, serde.fromBytes(null)) - - val fooBar = "foo bar" - val fooBarBytes = serde.toBytes(fooBar) - assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes) - assertEquals(fooBar, serde.fromBytes(fooBarBytes)) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala deleted file mode 100644 index 04ddcdb..0000000 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.nio.BufferUnderflowException -import java.util.UUID - -import org.junit.Assert._ -import org.junit.Test - -class TestUUIDSerde { - private val serde = new UUIDSerde - - @Test - def testUUIDSerde { - val uuid = new UUID(13, 42) - val bytes = serde.toBytes(uuid) - assertArrayEquals(Array[Byte](0, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0, 42), bytes) - assertEquals(uuid, serde.fromBytes(bytes)) - } - - @Test - def testToBytesWhenNull { - assertEquals(null, serde.toBytes(null)) - } - - @Test - def testFromBytesWhenNull { - assertEquals(null, serde.fromBytes(null)) - } - - @Test(expected = classOf[BufferUnderflowException]) - def testFromBytesWhenInvalid { - assertEquals(null, serde.fromBytes(Array[Byte](0))) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java index 0f0d792..5824489 100644 --- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java +++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java @@ -77,7 +77,7 @@ public class Log4jSystemConfig extends JavaSystemConfig { * supplied serde name. */ public String getSerdeClass(String name) { - return get(String.format(SerializerConfig.SERDE(), name), null); + return get(String.format(SerializerConfig.SERDE_FACTORY_CLASS(), name), null); } public String getStreamSerdeName(String systemName, String streamName) { http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java index e442599..8436835 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -246,7 +246,7 @@ public class StreamAppender extends AppenderSkeleton { SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass); serde = serdeFactory.getSerde(systemName, config); } else { - String serdeKey = String.format(SerializerConfig.SERDE(), serdeName); + String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName); throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " + serdeKey + " property"); } http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/log4j.xml b/samza-test/src/main/resources/log4j.xml index 7b4fb82..ab74ebd 100644 --- a/samza-test/src/main/resources/log4j.xml +++ b/samza-test/src/main/resources/log4j.xml @@ -12,43 +12,39 @@ <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> - <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender"> - <param name="File" value="${samza.log.dir}/${samza.container.name}.log" /> - <param name="MaxFileSize" value="256MB" /> - <param name="MaxBackupIndex" value="20" /> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" /> - </layout> - </appender> - - <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender"> - <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" /> - <param name="MaxFileSize" value="256MB" /> - <param name="MaxBackupIndex" value="1" /> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" /> - </layout> - </appender> - <appender name="console" class="org.apache.log4j.ConsoleAppender"> <param name="Target" value="System.out" /> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" /> + <param name="ConversionPattern" value="[%t] %c{1} [%p] %m%n" /> </layout> </appender> <root> <priority value="info" /> - <appender-ref ref="console" /> - <appender-ref ref="RollingAppender" /> + <appender-ref ref="console" /> </root> <logger name="STARTUP_LOGGER" additivity="false"> <level value="info" /> - <appender-ref ref="StartupAppender"/> + <appender-ref ref="console"/> </logger> <logger name="org.apache.hadoop"> - <level value="off"/> + <level value="ERROR"/> + </logger> + <logger name="org.I0Itec.zkclient"> + <level value="ERROR"/> + </logger> + <logger name="org.apache.zookeeper"> + <level value="ERROR"/> + </logger> + <logger name="org.apache.samza.system.kafka"> + <level value="ERROR"/> + </logger> + <logger name="org.apache.kafka"> + <level value="ERROR"/> + </logger> + <logger name="kafka"> + <level value="ERROR"/> </logger> -</log4j:configuration> +</log4j:configuration> \ No newline at end of file