http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java new file mode 100644 index 0000000..d8b51c5 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestSimpleInputDescriptor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + +import java.util.Collections; +import org.apache.samza.operators.descriptors.serde.ExampleSimpleInputDescriptor; +import org.apache.samza.operators.descriptors.serde.ExampleSimpleOutputDescriptor; +import org.apache.samza.operators.descriptors.serde.ExampleSimpleSystemDescriptor; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.system.SystemStreamMetadata; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class TestSimpleInputDescriptor { + @Test + public void testAPIUsage() { + // does not assert anything, but acts as a compile-time check on expected descriptor type parameters + // and validates that the method calls can be chained. + ExampleSimpleSystemDescriptor kafkaSystem = + new ExampleSimpleSystemDescriptor("kafka-system") + .withSystemConfigs(Collections.emptyMap()); + ExampleSimpleInputDescriptor<Integer> input1 = kafkaSystem.getInputDescriptor("input1", new IntegerSerde()); + ExampleSimpleOutputDescriptor<Integer> output1 = kafkaSystem.getOutputDescriptor("output1", new IntegerSerde()); + + input1 + .withBootstrap(false) + .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST) + .withPhysicalName("input-1") + .withPriority(1) + .withResetOffset(false) + .withStreamConfigs(Collections.emptyMap()); + + output1 + .withPhysicalName("output-1") + .withStreamConfigs(Collections.emptyMap()); + } + + @Test + public void testISDObjectsWithOverrides() { + ExampleSimpleSystemDescriptor ssd = new ExampleSimpleSystemDescriptor("kafka-system"); + IntegerSerde streamSerde = new IntegerSerde(); + ExampleSimpleInputDescriptor<Integer> isd = ssd.getInputDescriptor("input-stream", streamSerde); + + assertEquals(streamSerde, isd.getSerde()); + assertFalse(isd.getTransformer().isPresent()); + } +}
http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java new file mode 100644 index 0000000..60e4819 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestTransformingInputDescriptor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + +import java.util.Collections; +import org.apache.samza.operators.descriptors.transforming.ExampleTransformingInputDescriptor; +import org.apache.samza.operators.descriptors.transforming.ExampleTransformingOutputDescriptor; +import org.apache.samza.operators.descriptors.transforming.ExampleTransformingSystemDescriptor; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.system.SystemStreamMetadata; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestTransformingInputDescriptor { + @Test + public void testAPIUsage() { + // does not assert anything, but acts as a compile-time check on expected descriptor type parameters + // and validates that the method calls can be chained. + ExampleTransformingSystemDescriptor imeTransformingSystem = + new ExampleTransformingSystemDescriptor("imeTransformingSystem") + .withSystemConfigs(Collections.emptyMap()); + ExampleTransformingInputDescriptor<Long> input1 = imeTransformingSystem.getInputDescriptor("input1", new IntegerSerde()); + ExampleTransformingOutputDescriptor<Integer> output1 = imeTransformingSystem.getOutputDescriptor("output1", new IntegerSerde()); + + input1 + .withBootstrap(false) + .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST) + .withPhysicalName("input-1") + .withPriority(1) + .withResetOffset(false) + .withStreamConfigs(Collections.emptyMap()); + + output1 + .withPhysicalName("output-1") + .withStreamConfigs(Collections.emptyMap()); + } + + @Test + public void testISDObjectsWithOverrides() { + ExampleTransformingSystemDescriptor imeTransformingSystem = + new ExampleTransformingSystemDescriptor("imeTransformingSystem"); + IntegerSerde streamSerde = new IntegerSerde(); + ExampleTransformingInputDescriptor<Long> overridingISD = + imeTransformingSystem.getInputDescriptor("input-stream", streamSerde); + + assertEquals(streamSerde, overridingISD.getSerde()); + assertEquals(imeTransformingSystem.getTransformer().get(), overridingISD.getTransformer().get()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingInputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingInputDescriptor.java new file mode 100644 index 0000000..1f382a3 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingInputDescriptor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.expanding; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.serializers.Serde; + +public class ExampleExpandingInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, ExampleExpandingInputDescriptor<StreamMessageType>> { + ExampleExpandingInputDescriptor(String streamId, SystemDescriptor systemDescriptor, InputTransformer<StreamMessageType> transformer, Serde serde) { + super(streamId, serde, systemDescriptor, transformer); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingOutputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingOutputDescriptor.java new file mode 100644 index 0000000..705b866 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingOutputDescriptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.expanding; + +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +public class ExampleExpandingOutputDescriptor<StreamMessageType> extends OutputDescriptor<StreamMessageType, ExampleExpandingOutputDescriptor<StreamMessageType>> { + ExampleExpandingOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde<StreamMessageType> serde) { + super(streamId, serde, systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingSystemDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingSystemDescriptor.java new file mode 100644 index 0000000..c81c8aa --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/expanding/ExampleExpandingSystemDescriptor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.expanding; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.IncomingMessageEnvelope; + +public class ExampleExpandingSystemDescriptor extends SystemDescriptor<ExampleExpandingSystemDescriptor> + implements ExpandingInputDescriptorProvider<Long>, OutputDescriptorProvider { + private static final String FACTORY_CLASS_NAME = "org.apache.samza.GraphExpandingSystemFactory"; + + public ExampleExpandingSystemDescriptor(String systemName) { + super(systemName, FACTORY_CLASS_NAME, + (InputTransformer<String>) IncomingMessageEnvelope::toString, + (streamGraph, inputDescriptor) -> (MessageStream<Long>) streamGraph.getInputStream(inputDescriptor) + ); + } + + @Override + public ExampleExpandingInputDescriptor<Long> getInputDescriptor(String streamId, Serde serde) { + return new ExampleExpandingInputDescriptor<>(streamId, this, null, serde); + } + + @Override + public <StreamMessageType> ExampleExpandingOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) { + return new ExampleExpandingOutputDescriptor<>(streamId, this, serde); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleInputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleInputDescriptor.java new file mode 100644 index 0000000..c5df448 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleInputDescriptor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.serde; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.serializers.Serde; + +public class ExampleSimpleInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, ExampleSimpleInputDescriptor<StreamMessageType>> { + ExampleSimpleInputDescriptor(String streamId, SystemDescriptor systemDescriptor, InputTransformer<StreamMessageType> transformer, Serde serde) { + super(streamId, serde, systemDescriptor, transformer); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleOutputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleOutputDescriptor.java new file mode 100644 index 0000000..aeda2e0 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleOutputDescriptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.serde; + +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +public class ExampleSimpleOutputDescriptor<StreamMessageType> extends OutputDescriptor<StreamMessageType, ExampleSimpleOutputDescriptor<StreamMessageType>> { + ExampleSimpleOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde<StreamMessageType> serde) { + super(streamId, serde, systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleSystemDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleSystemDescriptor.java new file mode 100644 index 0000000..b0af4d8 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/serde/ExampleSimpleSystemDescriptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.serde; + +import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +public class ExampleSimpleSystemDescriptor extends SystemDescriptor<ExampleSimpleSystemDescriptor> + implements SimpleInputDescriptorProvider, OutputDescriptorProvider { + private static final String FACTORY_CLASS_NAME = "org.apache.kafka.KafkaSystemFactory"; + + public ExampleSimpleSystemDescriptor(String systemName) { + super(systemName, FACTORY_CLASS_NAME, null, null); + } + + @Override + public <StreamMessageType> ExampleSimpleInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, Serde<StreamMessageType> serde) { + return new ExampleSimpleInputDescriptor<>(streamId, this, null, serde); + } + + @Override + public <StreamMessageType> ExampleSimpleOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) { + return new ExampleSimpleOutputDescriptor<>(streamId, this, serde); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingInputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingInputDescriptor.java new file mode 100644 index 0000000..78b6f33 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingInputDescriptor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.transforming; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.serializers.Serde; + +public class ExampleTransformingInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, ExampleTransformingInputDescriptor<StreamMessageType>> { + ExampleTransformingInputDescriptor(String streamId, SystemDescriptor systemDescriptor, InputTransformer<StreamMessageType> transformer, Serde serde) { + super(streamId, serde, systemDescriptor, transformer); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingOutputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingOutputDescriptor.java new file mode 100644 index 0000000..c37906b --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingOutputDescriptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.transforming; + +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +public class ExampleTransformingOutputDescriptor<StreamMessageType> extends OutputDescriptor<StreamMessageType, ExampleTransformingOutputDescriptor<StreamMessageType>> { + ExampleTransformingOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde<StreamMessageType> serde) { + super(streamId, serde, systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingSystemDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingSystemDescriptor.java new file mode 100644 index 0000000..f8aff61 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/transforming/ExampleTransformingSystemDescriptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.transforming; + +import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider; +import org.apache.samza.serializers.Serde; + +public class ExampleTransformingSystemDescriptor extends SystemDescriptor<ExampleTransformingSystemDescriptor> + implements TransformingInputDescriptorProvider<Long>, OutputDescriptorProvider { + private static final String FACTORY_CLASS_NAME = "org.apache.samza.IMETransformingSystemFactory"; + + public ExampleTransformingSystemDescriptor(String systemName) { + super(systemName, FACTORY_CLASS_NAME, ime -> 1L, null); + } + + @Override + public ExampleTransformingInputDescriptor<Long> getInputDescriptor(String streamId, Serde serde) { + return new ExampleTransformingInputDescriptor<>(streamId, this, null, serde); + } + + @Override + public <StreamMessageType> ExampleTransformingOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) { + return new ExampleTransformingOutputDescriptor<>(streamId, this, serde); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java index 4f2aa23..3a8d5c9 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java @@ -174,8 +174,8 @@ import org.codehaus.jackson.map.ObjectMapper; StreamJson inputJson = new StreamJson(); opGraph.inputStreams.add(inputJson); inputJson.streamId = streamId; - Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs(); - inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()); + inputJson.nextOperatorIds = operatorSpec.getRegisteredOperatorSpecs().stream() + .map(OperatorSpec::getOpId).collect(Collectors.toSet()); updateOperatorGraphJson(operatorSpec, opGraph); }); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 2b279ef..9be6c37 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -235,15 +235,27 @@ public class JobNode { inEdges.forEach(edge -> { String streamId = edge.getStreamSpec().getId(); InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId); - streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde()); - streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde()); + Serde keySerde = inputOperatorSpec.getKeySerde(); + if (keySerde != null) { + streamKeySerdes.put(streamId, keySerde); + } + Serde valueSerde = inputOperatorSpec.getValueSerde(); + if (valueSerde != null) { + streamMsgSerdes.put(streamId, valueSerde); + } }); Map<String, OutputStreamImpl> outputStreams = specGraph.getOutputStreams(); outEdges.forEach(edge -> { String streamId = edge.getStreamSpec().getId(); OutputStreamImpl outputStream = outputStreams.get(streamId); - streamKeySerdes.put(streamId, outputStream.getKeySerde()); - streamMsgSerdes.put(streamId, outputStream.getValueSerde()); + Serde keySerde = outputStream.getKeySerde(); + if (keySerde != null) { + streamKeySerdes.put(streamId, keySerde); + } + Serde valueSerde = outputStream.getValueSerde(); + if (valueSerde != null) { + streamMsgSerdes.put(streamId, valueSerde); + } }); // collect all key and msg serde instances for stores http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 6922c76..3c1a1dc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -166,7 +166,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor, MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) { String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId); - IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde); + IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde, false); if (!intermediateStream.isKeyed()) { // this can only happen when the default serde partitionBy variant is being used throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde."); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java index 7dcd32e..8eb2425 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java @@ -18,17 +18,25 @@ */ package org.apache.samza.operators; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; - import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.operators.functions.StreamExpander; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OperatorSpecs; @@ -42,9 +50,6 @@ import org.apache.samza.table.TableSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - /** * This class defines: * 1) an implementation of {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to @@ -53,12 +58,14 @@ import com.google.common.base.Preconditions; */ public class StreamGraphSpec implements StreamGraph { private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphSpec.class); - public static final Pattern STREAM_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+"); - public static final Pattern TABLE_ID_PATTERN = Pattern.compile("[\\d\\w-_]+"); + private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+"); // We use a LHM for deterministic order in initializing and closing operators. private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); + private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>(); + private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>(); + private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>(); private final Set<String> broadcastStreams = new HashSet<>(); private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>(); private final Config config; @@ -70,29 +77,41 @@ public class StreamGraphSpec implements StreamGraph { */ private int nextOpNum = 0; private final Set<String> operatorIds = new HashSet<>(); - private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde()); private ContextManager contextManager = null; + private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty(); public StreamGraphSpec(Config config) { this.config = config; } @Override - public void setDefaultSerde(Serde<?> serde) { - Preconditions.checkNotNull(serde, "Default serde must not be null"); + public void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) { + Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null."); + String defaultSystemName = defaultSystemDescriptor.getSystemName(); Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(), - "Default serde must be set before creating any input or output streams."); - this.defaultSerde = serde; + "Default system must be set before creating any input or output streams."); + checkSystemDescriptorUniqueness(defaultSystemDescriptor, defaultSystemName); + systemDescriptors.put(defaultSystemName, defaultSystemDescriptor); + this.defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor); } @Override - public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) { - Preconditions.checkState(isValidStreamId(streamId), String.format( - "streamId %s doesn't confirm to pattern %s", streamId, StreamGraphSpec.STREAM_ID_PATTERN)); - Preconditions.checkNotNull(serde, "serde must not be null for an input stream."); + public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) { + SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor(); + Optional<StreamExpander> expander = systemDescriptor.getExpander(); + if (expander.isPresent()) { + return expander.get().apply(this, inputDescriptor); + } + + String streamId = inputDescriptor.getStreamId(); Preconditions.checkState(!inputOperators.containsKey(streamId), "getInputStream must not be called multiple times with the same streamId: " + streamId); + Preconditions.checkState(!inputDescriptors.containsKey(streamId), + "getInputStream must not be called multiple times with the same input descriptor: " + streamId); + String systemName = systemDescriptor.getSystemName(); + checkSystemDescriptorUniqueness(systemDescriptor, systemName); + Serde serde = inputDescriptor.getSerde(); KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); if (outputStreams.containsKey(streamId)) { OutputStreamImpl outputStream = outputStreams.get(streamId); @@ -104,26 +123,28 @@ public class StreamGraphSpec implements StreamGraph { } boolean isKeyed = serde instanceof KVSerde; + InputTransformer transformer = inputDescriptor.getTransformer().orElse(null); InputOperatorSpec inputOperatorSpec = OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(), - isKeyed, this.getNextOpId(OpCode.INPUT, null)); + transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null)); inputOperators.put(streamId, inputOperatorSpec); - return new MessageStreamImpl<>(this, inputOperators.get(streamId)); + inputDescriptors.put(streamId, inputDescriptor); + systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor); + return new MessageStreamImpl(this, inputOperators.get(streamId)); } @Override - public <M> MessageStream<M> getInputStream(String streamId) { - return (MessageStream<M>) getInputStream(streamId, defaultSerde); - } - - @Override - public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) { - Preconditions.checkState(isValidStreamId(streamId), String.format( - "streamId %s doesn't confirm to pattern %s", streamId, StreamGraphSpec.STREAM_ID_PATTERN)); - Preconditions.checkNotNull(serde, "serde must not be null for an output stream."); + public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) { + String streamId = outputDescriptor.getStreamId(); Preconditions.checkState(!outputStreams.containsKey(streamId), "getOutputStream must not be called multiple times with the same streamId: " + streamId); + Preconditions.checkState(!outputDescriptors.containsKey(streamId), + "getOutputStream must not be called multiple times with the same output descriptor: " + streamId); + SystemDescriptor systemDescriptor = outputDescriptor.getSystemDescriptor(); + String systemName = systemDescriptor.getSystemName(); + checkSystemDescriptorUniqueness(systemDescriptor, systemName); + Serde serde = outputDescriptor.getSerde(); KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); if (inputOperators.containsKey(streamId)) { InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId); @@ -136,21 +157,21 @@ public class StreamGraphSpec implements StreamGraph { boolean isKeyed = serde instanceof KVSerde; outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); + outputDescriptors.put(streamId, outputDescriptor); + systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor); return outputStreams.get(streamId); } @Override - public <M> OutputStream<M> getOutputStream(String streamId) { - return (OutputStream<M>) getOutputStream(streamId, defaultSerde); - } - - @Override - public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) { - Preconditions.checkState(isValidTableId(tableDesc.getTableId()), String.format( - "tableId %s doesn't confirm to pattern %s", tableDesc.getTableId(), TABLE_ID_PATTERN.toString())); - TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec(); - Preconditions.checkState(!tables.containsKey(tableSpec), String.format( - "getTable() invoked multiple times with the same tableId: %s", tableDesc.getTableId())); + public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) { + String tableId = tableDescriptor.getTableId(); + Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(), + String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString())); + TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec(); + if (tables.containsKey(tableSpec)) { + throw new IllegalStateException( + String.format("getTable() invoked multiple times with the same tableId: %s", tableId)); + } tables.put(tableSpec, new TableImpl(tableSpec)); return tables.get(tableSpec); } @@ -178,7 +199,7 @@ public class StreamGraphSpec implements StreamGraph { * @return the unique ID for the next operator in the graph */ public String getNextOpId(OpCode opCode, String userDefinedId) { - if (StringUtils.isNotBlank(userDefinedId) && !STREAM_ID_PATTERN.matcher(userDefinedId).matches()) { + if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) { throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId); } @@ -207,19 +228,6 @@ public class StreamGraphSpec implements StreamGraph { } /** - * See {@link StreamGraphSpec#getIntermediateStream(String, Serde, boolean)}. - * - * @param <M> type of messages in the intermediate stream - * @param streamId the id of the stream to be created - * @param serde the {@link Serde} to use for messages in the intermediate stream. If null, the default serde is used. - * @return the intermediate {@link MessageStreamImpl} - */ - @VisibleForTesting - public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) { - return getIntermediateStream(streamId, serde, false); - } - - /** * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph. * An intermediate {@link MessageStream} is both an output and an input stream. * @@ -231,21 +239,33 @@ public class StreamGraphSpec implements StreamGraph { * @return the intermediate {@link MessageStreamImpl} */ @VisibleForTesting - <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) { + public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) { Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId), "getIntermediateStream must not be called multiple times with the same streamId: " + streamId); if (serde == null) { - LOGGER.info("Using default serde for intermediate stream: " + streamId); - serde = (Serde<M>) defaultSerde; + LOGGER.info("No serde provided for intermediate stream: " + streamId + + ". Key and message serdes configured for the job.default.system will be used."); } if (isBroadcast) broadcastStreams.add(streamId); - boolean isKeyed = serde instanceof KVSerde; - KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + + boolean isKeyed; + KV<Serde, Serde> kvSerdes; + if (serde == null) { // if no explicit serde available + isKeyed = true; // assume keyed stream + kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs + } else { + isKeyed = serde instanceof KVSerde; + kvSerdes = getKVSerdes(streamId, serde); + } + + InputTransformer transformer = (InputTransformer) defaultSystemDescriptorOptional + .flatMap(SystemDescriptor::getTransformer).orElse(null); + InputOperatorSpec inputOperatorSpec = OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(), - isKeyed, this.getNextOpId(OpCode.INPUT, null)); + transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null)); inputOperators.put(streamId, inputOperatorSpec); outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId)); @@ -267,12 +287,28 @@ public class StreamGraphSpec implements StreamGraph { return Collections.unmodifiableMap(tables); } - public static boolean isValidStreamId(String id) { - return StringUtils.isNotBlank(id) && STREAM_ID_PATTERN.matcher(id).matches(); + public Map<String, InputDescriptor> getInputDescriptors() { + return Collections.unmodifiableMap(inputDescriptors); + } + + public Map<String, OutputDescriptor> getOutputDescriptors() { + return Collections.unmodifiableMap(outputDescriptors); + } + + public Set<SystemDescriptor> getSystemDescriptors() { + // We enforce that users must not use different system descriptor instances for the same system name + // when getting an input/output stream or setting the default system descriptor + return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values())); + } + + public Optional<SystemDescriptor> getDefaultSystemDescriptor() { + return this.defaultSystemDescriptorOptional; } - public static boolean isValidTableId(String id) { - return StringUtils.isNotBlank(id) && TABLE_ID_PATTERN.matcher(id).matches(); + private void checkSystemDescriptorUniqueness(SystemDescriptor systemDescriptor, String systemName) { + Preconditions.checkState(!systemDescriptors.containsKey(systemName) + || systemDescriptors.get(systemName) == systemDescriptor, + "Must not use different system descriptor instances for the same system name: " + systemName); } private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java new file mode 100644 index 0000000..16accae --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * A descriptor for samza framework internal usage. + * <p> + * Allows creating a {@link SystemDescriptor} without setting the factory class name, and delegating + * rest of the system customization to configurations. + * <p> + * Useful for code-generation and testing use cases where the factory name is not known in advance. + */ +@SuppressWarnings("unchecked") +public final class DelegatingSystemDescriptor extends SystemDescriptor<DelegatingSystemDescriptor> + implements SimpleInputDescriptorProvider, OutputDescriptorProvider { + + /** + * Constructs an {@link DelegatingSystemDescriptor} instance with no system level serde. + * Serdes must be provided explicitly at stream level when getting input or output descriptors. + * SystemFactory class name must be provided in configuration. + * + * @param systemName name of this system + */ + @VisibleForTesting + public DelegatingSystemDescriptor(String systemName) { + super(systemName, null, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor( + String streamId, Serde<StreamMessageType> serde) { + return new GenericInputDescriptor<>(streamId, this, serde); + } + + /** + * {@inheritDoc} + */ + @Override + public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor( + String streamId, Serde<StreamMessageType> serde) { + return new GenericOutputDescriptor<>(streamId, this, serde); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java index ef3c322..6cc57e0 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java @@ -20,8 +20,10 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.InputTransformer; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -32,15 +34,12 @@ import java.util.Collections; /** * An operator that builds the input message from the incoming message. - * - * @param <K> the type of key in the incoming message - * @param <V> the type of message in the incoming message */ -public final class InputOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Object> { // Object == KV<K,V> | V +public final class InputOperatorImpl extends OperatorImpl<IncomingMessageEnvelope, Object> { - private final InputOperatorSpec<K, V> inputOpSpec; + private final InputOperatorSpec inputOpSpec; - InputOperatorImpl(InputOperatorSpec<K, V> inputOpSpec) { + InputOperatorImpl(InputOperatorSpec inputOpSpec) { this.inputOpSpec = inputOpSpec; } @@ -49,8 +48,14 @@ public final class InputOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Object } @Override - public Collection<Object> handleMessage(KV<K, V> pair, MessageCollector collector, TaskCoordinator coordinator) { - Object message = this.inputOpSpec.isKeyed() ? pair : pair.getValue(); + public Collection<Object> handleMessage(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { + Object message; + InputTransformer transformer = inputOpSpec.getTransformer(); + if (transformer != null) { + message = transformer.apply(ime); + } else { + message = this.inputOpSpec.isKeyed() ? KV.of(ime.getKey(), ime.getMessage()) : ime.getMessage(); + } return Collections.singletonList(message); } @@ -58,7 +63,7 @@ public final class InputOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Object protected void handleClose() { } - protected OperatorSpec<KV<K, V>, Object> getOperatorSpec() { + protected OperatorSpec<IncomingMessageEnvelope, Object> getOperatorSpec() { return this.inputOpSpec; } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index 922a1f9..c49443d 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@ -18,47 +18,65 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.InputTransformer; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.Serde; +import org.apache.samza.system.IncomingMessageEnvelope; /** * The spec for an operator that receives incoming messages from an input stream - * and converts them to the input message. - * - * @param <K> the type of input key - * @param <V> the type of input value + * and converts them to the input message. The input message type is: + * <ul> + * <li>{@code T} if the input stream has an {@link InputTransformer} with result type T + * <li>{@code KV<K, V>} if the input stream is keyed + * <li>{@code V} if the input stream is unkeyed + * </ul> */ -public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // Object == KV<K, V> | V +public class InputOperatorSpec extends OperatorSpec<IncomingMessageEnvelope, Object> { - private final boolean isKeyed; private final String streamId; + private final boolean isKeyed; + private final InputTransformer transformer; // may be null /** - * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the configs for a stream, and deserialized - * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis + * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the configs for a stream, and + * deserialized once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis + * + * Serdes are optional for intermediate streams and may be specified for job.default.system in configuration instead. */ - private transient final Serde<K> keySerde; - private transient final Serde<V> valueSerde; + private transient final Serde keySerde; + private transient final Serde valueSerde; - public InputOperatorSpec(String streamId, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) { + public InputOperatorSpec(String streamId, Serde keySerde, Serde valueSerde, + InputTransformer transformer, boolean isKeyed, String opId) { super(OpCode.INPUT, opId); this.streamId = streamId; + this.isKeyed = isKeyed; + this.transformer = transformer; this.keySerde = keySerde; this.valueSerde = valueSerde; - this.isKeyed = isKeyed; } public String getStreamId() { return this.streamId; } - public Serde<K> getKeySerde() { + /** + * Get the key serde for this input stream if any. + * + * @return the key serde if any, else null + */ + public Serde getKeySerde() { return keySerde; } - public Serde<V> getValueSerde() { + /** + * Get the value serde for this input stream if any. + * + * @return the value serde if any, else null + */ + public Serde getValueSerde() { return valueSerde; } @@ -66,6 +84,15 @@ public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // return isKeyed; } + /** + * Get the {@link InputTransformer} for this input stream if any. + * + * @return the {@link InputTransformer} if any, else null + */ + public InputTransformer getTransformer() { + return transformer; + } + @Override public WatermarkFunction getWatermarkFn() { return null; http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index 9e788da..6ebbdae 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -22,6 +22,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.InputTransformer; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.SinkFunction; @@ -44,15 +45,15 @@ public class OperatorSpecs { * @param streamId the stream id for the input stream * @param keySerde the serde for the input key * @param valueSerde the serde for the input value + * @param transformer the input stream transformer * @param isKeyed whether the input stream is keyed * @param opId the unique ID of the operator - * @param <K> type of input key - * @param <V> type of input value * @return the {@link InputOperatorSpec} */ - public static <K, V> InputOperatorSpec<K, V> createInputOperatorSpec( - String streamId, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) { - return new InputOperatorSpec<>(streamId, keySerde, valueSerde, isKeyed, opId); + public static InputOperatorSpec createInputOperatorSpec( + String streamId, Serde keySerde, Serde valueSerde, + InputTransformer transformer, boolean isKeyed, String opId) { + return new InputOperatorSpec(streamId, keySerde, valueSerde, transformer, isKeyed, opId); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java index 5d70e6f..fc736c3 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java @@ -30,6 +30,8 @@ public class OutputStreamImpl<M> implements OutputStream<M>, Serializable { /** * The following fields are serialized by the ExecutionPlanner when generating the configs for the output stream, and * deserialized once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis + * + * Serdes are optional for intermediate streams and may be specified for job.default.system in configuration instead. */ private transient final Serde keySerde; private transient final Serde valueSerde; @@ -45,10 +47,20 @@ public class OutputStreamImpl<M> implements OutputStream<M>, Serializable { return streamId; } + /** + * Get the key serde for this output stream if any. + * + * @return the key serde if any, else null + */ public Serde getKeySerde() { return keySerde; } + /** + * Get the value serde for this output stream if any. + * + * @return the value serde if any, else null + */ public Serde getValueSerde() { return valueSerde; } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java index 3bb8713..17c6903 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java @@ -44,7 +44,7 @@ public class IntermediateMessageStreamImpl<M> extends MessageStreamImpl<M> imple private final OutputStreamImpl<M> outputStream; private final boolean isKeyed; - public IntermediateMessageStreamImpl(StreamGraphSpec graph, InputOperatorSpec<?, M> inputOperatorSpec, + public IntermediateMessageStreamImpl(StreamGraphSpec graph, InputOperatorSpec inputOperatorSpec, OutputStreamImpl<M> outputStream) { super(graph, (OperatorSpec<?, M>) inputOperatorSpec); this.outputStream = outputStream; http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 22550d5..ed67d80 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -164,7 +164,7 @@ public class StreamProcessor { } /** - * Same as {@link StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task + * Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task * instances are created using the provided {@link StreamTaskFactory}. * @param config - config * @param customMetricsReporters metric Reporter http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java index f7ca122..7127ff7 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java @@ -30,6 +30,7 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.ApplicationConfig.ApplicationMode; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.StreamConfig; @@ -81,6 +82,17 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { ? ApplicationMode.BATCH : ApplicationMode.STREAM; cfg.put(ApplicationConfig.APP_MODE, mode.name()); + // merge user-provided configuration with input/output descriptor generated configuration + // descriptor generated configuration has higher priority + Map<String, String> systemStreamConfigs = new HashMap<>(); + graphSpec.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); + graphSpec.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); + graphSpec.getSystemDescriptors().forEach(sd -> systemStreamConfigs.putAll(sd.toConfig())); + graphSpec.getDefaultSystemDescriptor().ifPresent(dsd -> + systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), dsd.getSystemName())); + Map<String, String> appConfigs = new HashMap<>(cfg); + appConfigs.putAll(systemStreamConfigs); + // create the physical execution plan Config generatedConfig = new MapConfig(cfg); StreamManager streamManager = buildAndStartStreamManager(generatedConfig); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index fdd134f..dd8d6c3 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -23,7 +23,6 @@ import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.system.MessageType; import org.apache.samza.operators.ContextManager; -import org.apache.samza.operators.KV; import org.apache.samza.operators.impl.InputOperatorImpl; import org.apache.samza.operators.impl.OperatorImplGraph; import org.apache.samza.system.IncomingMessageEnvelope; @@ -111,7 +110,7 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT if (inputOpImpl != null) { switch (MessageType.of(ime.getMessage())) { case USER_MESSAGE: - inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, coordinator); + inputOpImpl.onMessage(ime, collector, coordinator); break; case END_OF_STREAM: http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java index e7a1e54..96cdd6c 100644 --- a/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java @@ -32,6 +32,7 @@ public class StreamUtil { * Gets the {@link SystemStream} corresponding to the provided stream, which may be * a streamId, or stream name of the format systemName.streamName. * + * @param config the config for the job * @param stream the stream name or id to get the {@link SystemStream} for. * @return the {@link SystemStream} for the stream */ http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 4d25ebb..9912d8b 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -36,8 +36,13 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; @@ -61,12 +66,18 @@ public class TestExecutionPlanner { private StreamManager streamManager; private Config config; - private StreamSpec input1; - private StreamSpec input2; - private StreamSpec input3; - private StreamSpec input4; - private StreamSpec output1; - private StreamSpec output2; + private StreamSpec input1Spec; + private GenericInputDescriptor<KV<Object, Object>> input1Descriptor; + private StreamSpec input2Spec; + private GenericInputDescriptor<KV<Object, Object>> input2Descriptor; + private StreamSpec input3Spec; + private GenericInputDescriptor<KV<Object, Object>> input3Descriptor; + private StreamSpec input4Spec; + private GenericInputDescriptor<KV<Object, Object>> input4Descriptor; + private StreamSpec output1Spec; + private GenericOutputDescriptor<KV<Object, Object>> output1Descriptor; + private StreamSpec output2Spec; + private GenericOutputDescriptor<KV<Object, Object>> output2Descriptor; static SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) { @@ -104,8 +115,8 @@ public class TestExecutionPlanner { * */ StreamGraphSpec graphSpec = new StreamGraphSpec(config); - MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream("input1"); - OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1"); + MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream(input1Descriptor); + OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor); input1 .partitionBy(m -> m.key, m -> m.value, "p1") .map(kv -> kv) @@ -128,19 +139,19 @@ public class TestExecutionPlanner { StreamGraphSpec graphSpec = new StreamGraphSpec(config); MessageStream<KV<Object, Object>> messageStream1 = - graphSpec.<KV<Object, Object>>getInputStream("input1") + graphSpec.getInputStream(input1Descriptor) .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = - graphSpec.<KV<Object, Object>>getInputStream("input2") + graphSpec.getInputStream(input2Descriptor) .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = - graphSpec.<KV<Object, Object>>getInputStream("input3") + graphSpec.getInputStream(input3Descriptor) .filter(m -> true) .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); - OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1"); - OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream("output2"); + OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor); + OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream(output2Descriptor); messageStream1 .join(messageStream2, @@ -160,19 +171,19 @@ public class TestExecutionPlanner { StreamGraphSpec graphSpec = new StreamGraphSpec(config); MessageStream<KV<Object, Object>> messageStream1 = - graphSpec.<KV<Object, Object>>getInputStream("input1") + graphSpec.getInputStream(input1Descriptor) .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = - graphSpec.<KV<Object, Object>>getInputStream("input2") + graphSpec.getInputStream(input2Descriptor) .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = - graphSpec.<KV<Object, Object>>getInputStream("input3") + graphSpec.getInputStream(input3Descriptor) .filter(m -> true) .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); - OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1"); - OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream("output2"); + OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor); + OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream(output2Descriptor); messageStream1.map(m -> m) .filter(m->true) @@ -214,13 +225,24 @@ public class TestExecutionPlanner { StreamTestUtils.addStreamConfigs(configMap, "output2", "system2", "output2"); config = new MapConfig(configMap); - input1 = new StreamSpec("input1", "input1", "system1"); - input2 = new StreamSpec("input2", "input2", "system2"); - input3 = new StreamSpec("input3", "input3", "system2"); - input4 = new StreamSpec("input4", "input4", "system1"); - - output1 = new StreamSpec("output1", "output1", "system1"); - output2 = new StreamSpec("output2", "output2", "system2"); + input1Spec = new StreamSpec("input1", "input1", "system1"); + input2Spec = new StreamSpec("input2", "input2", "system2"); + input3Spec = new StreamSpec("input3", "input3", "system2"); + input4Spec = new StreamSpec("input4", "input4", "system1"); + + output1Spec = new StreamSpec("output1", "output1", "system1"); + output2Spec = new StreamSpec("output2", "output2", "system2"); + + KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new NoOpSerde()); + String mockSystemFactoryClass = "factory.class.name"; + GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", mockSystemFactoryClass); + GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", mockSystemFactoryClass); + input1Descriptor = system1.getInputDescriptor("input1", kvSerde); + input2Descriptor = system2.getInputDescriptor("input2", kvSerde); + input3Descriptor = system2.getInputDescriptor("input3", kvSerde); + input4Descriptor = system1.getInputDescriptor("input4", kvSerde); + output1Descriptor = system1.getOutputDescriptor("output1", kvSerde); + output2Descriptor = system2.getOutputDescriptor("output2", kvSerde); // set up external partition count Map<String, Integer> system1Map = new HashMap<>(); @@ -258,11 +280,11 @@ public class TestExecutionPlanner { JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager); - assertTrue(jobGraph.getOrCreateStreamEdge(input1).getPartitionCount() == 64); - assertTrue(jobGraph.getOrCreateStreamEdge(input2).getPartitionCount() == 16); - assertTrue(jobGraph.getOrCreateStreamEdge(input3).getPartitionCount() == 32); - assertTrue(jobGraph.getOrCreateStreamEdge(output1).getPartitionCount() == 8); - assertTrue(jobGraph.getOrCreateStreamEdge(output2).getPartitionCount() == 16); + assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() == 64); + assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() == 16); + assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() == 32); + assertTrue(jobGraph.getOrCreateStreamEdge(output1Spec).getPartitionCount() == 8); + assertTrue(jobGraph.getOrCreateStreamEdge(output2Spec).getPartitionCount() == 16); jobGraph.getIntermediateStreamEdges().forEach(edge -> { assertTrue(edge.getPartitionCount() == -1); @@ -394,13 +416,13 @@ public class TestExecutionPlanner { @Test public void testMaxPartition() { Collection<StreamEdge> edges = new ArrayList<>(); - StreamEdge edge = new StreamEdge(input1, false, false, config); + StreamEdge edge = new StreamEdge(input1Spec, false, false, config); edge.setPartitionCount(2); edges.add(edge); - edge = new StreamEdge(input2, false, false, config); + edge = new StreamEdge(input2Spec, false, false, config); edge.setPartitionCount(32); edges.add(edge); - edge = new StreamEdge(input3, false, false, config); + edge = new StreamEdge(input3Spec, false, false, config); edge.setPartitionCount(16); edges.add(edge); @@ -417,8 +439,8 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamGraphSpec graphSpec = new StreamGraphSpec(config); - MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream("input4"); - OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1"); + MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream(input4Descriptor); + OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor); input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1); JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index b0f3843..960693f 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -29,8 +29,12 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphSpec; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.LongSerde; import org.apache.samza.serializers.NoOpSerde; @@ -99,21 +103,30 @@ public class TestJobGraphJsonGenerator { StreamManager streamManager = new StreamManager(systemAdmins); StreamGraphSpec graphSpec = new StreamGraphSpec(config); - graphSpec.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new NoOpSerde()); + String mockSystemFactoryClass = "factory.class.name"; + GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", mockSystemFactoryClass); + GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", mockSystemFactoryClass); + GenericInputDescriptor<KV<Object, Object>> input1Descriptor = system1.getInputDescriptor("input1", kvSerde); + GenericInputDescriptor<KV<Object, Object>> input2Descriptor = system2.getInputDescriptor("input2", kvSerde); + GenericInputDescriptor<KV<Object, Object>> input3Descriptor = system2.getInputDescriptor("input3", kvSerde); + GenericOutputDescriptor<KV<Object, Object>> output1Descriptor = system1.getOutputDescriptor("output1", kvSerde); + GenericOutputDescriptor<KV<Object, Object>> output2Descriptor = system2.getOutputDescriptor("output2", kvSerde); + MessageStream<KV<Object, Object>> messageStream1 = - graphSpec.<KV<Object, Object>>getInputStream("input1") + graphSpec.getInputStream(input1Descriptor) .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = - graphSpec.<KV<Object, Object>>getInputStream("input2") + graphSpec.getInputStream(input2Descriptor) .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = - graphSpec.<KV<Object, Object>>getInputStream("input3") + graphSpec.getInputStream(input3Descriptor) .filter(m -> true) .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); - OutputStream<KV<Object, Object>> outputStream1 = graphSpec.getOutputStream("output1"); - OutputStream<KV<Object, Object>> outputStream2 = graphSpec.getOutputStream("output2"); + OutputStream<KV<Object, Object>> outputStream1 = graphSpec.getOutputStream(output1Descriptor); + OutputStream<KV<Object, Object>> outputStream2 = graphSpec.getOutputStream(output2Descriptor); messageStream1 .join(messageStream2, @@ -165,7 +178,16 @@ public class TestJobGraphJsonGenerator { StreamManager streamManager = new StreamManager(systemAdmins); StreamGraphSpec graphSpec = new StreamGraphSpec(config); - MessageStream<KV<String, PageViewEvent>> inputStream = graphSpec.getInputStream("PageView"); + KVSerde<String, PageViewEvent> pvSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)); + GenericSystemDescriptor isd = new GenericSystemDescriptor("hdfs", "mockSystemFactoryClass"); + GenericInputDescriptor<KV<String, PageViewEvent>> pageView = isd.getInputDescriptor("PageView", pvSerde); + + KVSerde<String, Long> pvcSerde = KVSerde.of(new StringSerde(), new LongSerde()); + GenericSystemDescriptor osd = new GenericSystemDescriptor("kafka", "mockSystemFactoryClass"); + GenericOutputDescriptor<KV<String, Long>> pageViewCount = osd.getOutputDescriptor("PageViewCount", pvcSerde); + + MessageStream<KV<String, PageViewEvent>> inputStream = graphSpec.getInputStream(pageView); + OutputStream<KV<String, Long>> outputStream = graphSpec.getOutputStream(pageViewCount); inputStream .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), "keyed-by-country") .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(), @@ -175,7 +197,7 @@ public class TestJobGraphJsonGenerator { new StringSerde(), new LongSerde()), "count-by-country") .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage())) - .sendTo(graphSpec.getOutputStream("PageViewCount")); + .sendTo(outputStream); ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());