SAMZA-1804: System and Stream Descriptors Design details: https://cwiki.apache.org/confluence/display/SAMZA/SEP-14%3A+System+and+Stream+Descriptors
Author: Prateek Maheshwari <pmaheshw...@apache.org> Reviewers: Yi Pan <nickpa...@gmail.com>, Cameron Lee <ca...@linkedin.com> Closes #603 from prateekm/stream-descriptor Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2a71baf7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2a71baf7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2a71baf7 Branch: refs/heads/master Commit: 2a71baf7c05eeacdc0bf4463ca946c23da7995a0 Parents: 2d6b199 Author: Prateek Maheshwari <pmaheshw...@apache.org> Authored: Mon Aug 27 12:06:27 2018 -0700 Committer: Prateek Maheshwari <pmaheshw...@apache.org> Committed: Mon Aug 27 12:06:27 2018 -0700 ---------------------------------------------------------------------- build.gradle | 5 +- .../apache/samza/operators/MessageStream.java | 12 +- .../org/apache/samza/operators/StreamGraph.java | 109 ++---- .../descriptors/GenericInputDescriptor.java | 43 +++ .../descriptors/GenericOutputDescriptor.java | 43 +++ .../descriptors/GenericSystemDescriptor.java | 67 ++++ .../base/stream/InputDescriptor.java | 187 +++++++++ .../base/stream/OutputDescriptor.java | 44 +++ .../base/stream/StreamDescriptor.java | 136 +++++++ .../ExpandingInputDescriptorProvider.java | 44 +++ .../base/system/OutputDescriptorProvider.java | 48 +++ .../system/SimpleInputDescriptorProvider.java | 43 +++ .../base/system/SystemDescriptor.java | 177 +++++++++ .../TransformingInputDescriptorProvider.java | 44 +++ .../operators/functions/InputTransformer.java | 45 +++ .../operators/functions/StreamExpander.java | 58 +++ .../TestExpandingInputDescriptor.java | 61 +++ .../descriptors/TestGenericInputDescriptor.java | 123 ++++++ .../TestGenericSystemDescriptor.java | 63 ++++ .../descriptors/TestSimpleInputDescriptor.java | 65 ++++ .../TestTransformingInputDescriptor.java | 66 ++++ .../ExampleExpandingInputDescriptor.java | 30 ++ .../ExampleExpandingOutputDescriptor.java | 29 ++ .../ExampleExpandingSystemDescriptor.java | 49 +++ .../serde/ExampleSimpleInputDescriptor.java | 30 ++ .../serde/ExampleSimpleOutputDescriptor.java | 29 ++ .../serde/ExampleSimpleSystemDescriptor.java | 43 +++ .../ExampleTransformingInputDescriptor.java | 30 ++ .../ExampleTransformingOutputDescriptor.java | 29 ++ .../ExampleTransformingSystemDescriptor.java | 43 +++ .../samza/execution/JobGraphJsonGenerator.java | 4 +- .../org/apache/samza/execution/JobNode.java | 20 +- .../samza/operators/MessageStreamImpl.java | 2 +- .../apache/samza/operators/StreamGraphSpec.java | 158 +++++--- .../descriptors/DelegatingSystemDescriptor.java | 70 ++++ .../samza/operators/impl/InputOperatorImpl.java | 23 +- .../samza/operators/spec/InputOperatorSpec.java | 57 ++- .../samza/operators/spec/OperatorSpecs.java | 11 +- .../samza/operators/spec/OutputStreamImpl.java | 12 + .../stream/IntermediateMessageStreamImpl.java | 2 +- .../apache/samza/processor/StreamProcessor.java | 2 +- .../runtime/AbstractApplicationRunner.java | 12 + .../apache/samza/task/StreamOperatorTask.java | 3 +- .../java/org/apache/samza/util/StreamUtil.java | 1 + .../samza/execution/TestExecutionPlanner.java | 92 +++-- .../execution/TestJobGraphJsonGenerator.java | 38 +- .../org/apache/samza/execution/TestJobNode.java | 81 +++- .../samza/operators/TestJoinOperator.java | 50 ++- .../samza/operators/TestMessageStreamImpl.java | 4 +- .../samza/operators/TestOperatorSpecGraph.java | 4 +- .../samza/operators/TestStreamGraphSpec.java | 376 +++++++++---------- .../operators/impl/TestInputOperatorImpl.java | 80 ++++ .../operators/impl/TestOperatorImplGraph.java | 96 +++-- .../operators/impl/TestWindowOperator.java | 24 +- .../samza/operators/spec/TestOperatorSpec.java | 16 +- .../spec/TestPartitionByOperatorSpec.java | 63 +++- .../scala/org/apache/samza/util/TestUtil.scala | 2 - .../system/kafka/KafkaInputDescriptor.java | 108 ++++++ .../system/kafka/KafkaOutputDescriptor.java | 39 ++ .../system/kafka/KafkaSystemDescriptor.java | 251 +++++++++++++ .../system/kafka/TestKafkaInputDescriptor.java | 68 ++++ .../system/kafka/TestKafkaSystemDescriptor.java | 69 ++++ .../kv/BaseLocalStoreBackedTableProvider.java | 15 +- .../samza/sql/translator/QueryTranslator.java | 10 +- .../samza/sql/translator/ScanTranslator.java | 13 +- .../samza/sql/translator/TranslatorContext.java | 18 +- .../sql/translator/TestJoinTranslator.java | 2 +- .../sql/translator/TestQueryTranslator.java | 5 +- .../example/AppWithGlobalConfigExample.java | 17 +- .../apache/samza/example/BroadcastExample.java | 24 +- .../samza/example/KeyValueStoreExample.java | 27 +- .../org/apache/samza/example/MergeExample.java | 25 +- .../samza/example/OrderShipmentJoinExample.java | 29 +- .../samza/example/PageViewCounterExample.java | 17 +- .../samza/example/RepartitionExample.java | 24 +- .../org/apache/samza/example/WindowExample.java | 19 +- .../apache/samza/test/framework/TestRunner.java | 2 +- .../system/CollectionStreamSystemSpec.java | 25 +- .../TestStandaloneIntegrationApplication.java | 21 +- .../EndOfStreamIntegrationTest.java | 11 +- .../WatermarkIntegrationTest.java | 12 +- .../test/framework/BroadcastAssertApp.java | 11 +- .../StreamApplicationIntegrationTest.java | 29 +- .../samza/test/framework/TestTimerApp.java | 17 +- .../test/operator/RepartitionJoinWindowApp.java | 32 +- .../test/operator/RepartitionWindowApp.java | 19 +- .../samza/test/operator/SessionWindowApp.java | 19 +- .../operator/TestRepartitionJoinWindowApp.java | 12 +- .../test/operator/TestRepartitionWindowApp.java | 6 +- .../samza/test/operator/TumblingWindowApp.java | 19 +- .../test/processor/TestStreamApplication.java | 20 +- .../processor/TestZkLocalApplicationRunner.java | 60 ++- .../apache/samza/test/table/TestLocalTable.java | 28 +- .../table/TestLocalTableWithSideInputs.java | 11 +- .../samza/test/table/TestRemoteTable.java | 17 +- .../benchmark/SystemConsumerWithSamzaBench.java | 11 +- 96 files changed, 3541 insertions(+), 719 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index e4583e4..7d23717 100644 --- a/build.gradle +++ b/build.gradle @@ -139,11 +139,12 @@ project(':samza-api') { apply plugin: 'java' dependencies { - compile "org.slf4j:slf4j-api:$slf4jVersion" + compile "org.apache.commons:commons-lang3:$commonsLang3Version" compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" + compile "com.google.guava:guava:$guavaVersion" + compile "org.slf4j:slf4j-api:$slf4jVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" - testCompile "com.google.guava:guava:$guavaVersion" } checkstyle { configFile = new File(rootDir, "checkstyle/checkstyle.xml") http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index 7797f9a..e3a61c4 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -213,13 +213,11 @@ public interface MessageStream<M> { /** * Re-partitions this {@link MessageStream} using keys from the {@code keyExtractor} by creating a new - * intermediate stream on the {@code job.default.system}. This intermediate stream is both an output and - * input to the job. + * intermediate stream on the default system provided via {@link StreamGraph#setDefaultSystem}. This intermediate + * stream is both an output and input to the job. * <p> * Uses the provided {@link KVSerde} for serialization of keys and values. If the provided {@code serde} is null, - * uses the default serde provided via {@link StreamGraph#setDefaultSerde}, which must be a KVSerde. If the default - * serde is not a {@link KVSerde}, a runtime exception will be thrown. If no default serde has been provided - * <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, NoOpSerde>} is used. + * uses the key and message serde configured for the job's default system. * <p> * The number of partitions for this intermediate stream is determined as follows: * If the stream is an eventual input to a {@link #join}, and the number of partitions for the other stream is known, @@ -251,9 +249,7 @@ public interface MessageStream<M> { /** * Same as calling {@link #partitionBy(MapFunction, MapFunction, KVSerde, String)} with a null KVSerde. * <p> - * Uses the default serde provided via {@link StreamGraph#setDefaultSerde}, which must be a KVSerde. If the default - * serde is not a {@link KVSerde}, a runtime exception will be thrown. If no default serde has been provided - * <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, NoOpSerde>} is used. + * Uses the key and message serde configured for the job's default system. * * @param keyExtractor the {@link MapFunction} to extract the message and partition key from the input message * @param valueExtractor the {@link MapFunction} to extract the value from the input message http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java index 6871bc7..ec6e4b7 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java @@ -19,7 +19,9 @@ package org.apache.samza.operators; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.serializers.Serde; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; import org.apache.samza.table.Table; @@ -30,96 +32,65 @@ import org.apache.samza.table.Table; public interface StreamGraph { /** - * Sets the default {@link Serde} to use for (de)serializing messages. - * <p>. - * If the default serde is set, it must be set <b>before</b> creating any input or output streams. + * Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting + * {@code job.default.system} and its properties in configuration. * <p> - * If no explicit or default serdes are provided, a {@code KVSerde<NoOpSerde, NoOpSerde>} is used. This means that - * any streams created without explicit or default serdes should be cast to {@code MessageStream<KV<Object, Object>>}. + * If the default system descriptor is set, it must be set <b>before</b> creating any intermediate streams. * <p> - * Providing an incompatible message type for the input/output streams that use the default serde will result in + * If the intermediate stream is created with a stream-level Serde, they will be used, else the serde specified + * for the {@code job.default.system} in configuration will be used. + * <p> + * Providing an incompatible message type for the intermediate streams that use the default serde will result in * {@link ClassCastException}s at runtime. * - * @param serde the default message {@link Serde} to use + * @param defaultSystemDescriptor the default system descriptor to use */ - void setDefaultSerde(Serde<?> serde); + void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor); /** - * Gets the input {@link MessageStream} corresponding to the {@code streamId}. + * Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}. * <p> - * An input {@code MessageStream<KV<K, V>}, which can be obtained by calling this method with a {@code KVSerde<K, V>}, - * can receive messages of type {@code KV<K, V>}. An input {@code MessageStream<M>} with any other {@code Serde<M>} - * can receive messages of type M - the key in the incoming message is ignored. + * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>}, + * can receive messages of type {@code KV<K, V>}. An input {@code MessageStream<M>}, obtained using a descriptor with + * any other {@code Serde<M>}, can receive messages of type M - the key in the incoming message is ignored. * <p> - * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used if the {@code SystemConsumer} - * deserializes the incoming messages itself, and no further deserialization is required from the framework. + * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the + * {@code SystemConsumer} deserializes the incoming messages itself, and no further deserialization is required from + * the framework. * <p> - * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}. + * Multiple invocations of this method with the same {@code inputDescriptor} will throw an + * {@link IllegalStateException}. * - * @param streamId the unique ID for the stream - * @param serde the {@link Serde} to use for deserializing incoming messages + * @param inputDescriptor the descriptor for the stream * @param <M> the type of messages in the input {@link MessageStream} * @return the input {@link MessageStream} - * @throws IllegalStateException when invoked multiple times with the same {@code streamId} - */ - <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde); - - /** - * Same as {@link #getInputStream(String, Serde)}, but uses the default {@link Serde} provided via - * {@link #setDefaultSerde(Serde)} for deserializing input messages. - * <p> - * If no default serde has been provided <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, NoOpSerde>} - * is used. Providing a message type {@code M} that is incompatible with the default Serde will result in - * {@link ClassCastException}s at runtime. - * <p> - * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}. - * - * @param streamId the unique ID for the stream - * @param <M> the type of message in the input {@link MessageStream} - * @return the input {@link MessageStream} - * @throws IllegalStateException when invoked multiple times with the same {@code streamId} + * @throws IllegalStateException when invoked multiple times with the same {@code inputDescriptor} */ - <M> MessageStream<M> getInputStream(String streamId); + <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor); /** - * Gets the {@link OutputStream} corresponding to the {@code streamId}. + * Gets the {@link OutputStream} corresponding to the {@code outputDescriptor}. * <p> - * An {@code OutputStream<KV<K, V>>}, which can be obtained by calling this method with a {@code KVSerde<K, V>}, - * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>} with any other {@code Serde<M>} can - * send messages of type M without a key. + * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>}, + * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, obtained using a descriptor with any + * other {@code Serde<M>}, can send messages of type M without a key. * <p> - * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used if the {@code SystemProducer} - * serializes the outgoing messages itself, and no prior serialization is required from the framework. + * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the + * {@code SystemProducer} serializes the outgoing messages itself, and no prior serialization is required from + * the framework. * <p> * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are partitioned using their serialized key. * When sending messages to any other {@code OutputStream<M>}, messages are partitioned using a null partition key. * <p> - * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}. - * - * @param streamId the unique ID for the stream - * @param serde the {@link Serde} to use for serializing outgoing messages - * @param <M> the type of messages in the {@link OutputStream} - * @return the output {@link MessageStream} - * @throws IllegalStateException when invoked multiple times with the same {@code streamId} - */ - <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde); - - /** - * Same as {@link #getOutputStream(String, Serde)}, but uses the default {@link Serde} provided via - * {@link #setDefaultSerde(Serde)} for serializing output messages. - * <p> - * If no default serde has been provided <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, NoOpSerde>} - * is used. Providing a message type {@code M} that is incompatible with the default Serde will result in - * {@link ClassCastException}s at runtime. - * <p> - * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}. + * Multiple invocations of this method with the same {@code outputDescriptor} will throw an + * {@link IllegalStateException}. * - * @param streamId the unique ID for the stream + * @param outputDescriptor the descriptor for the stream * @param <M> the type of messages in the {@link OutputStream} - * @return the output {@link MessageStream} - * @throws IllegalStateException when invoked multiple times with the same {@code streamId} + * @return the {@link OutputStream} + * @throws IllegalStateException when invoked multiple times with the same {@code outputDescriptor} */ - <M> OutputStream<M> getOutputStream(String streamId); + <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor); /** * Gets the {@link Table} corresponding to the {@link TableDescriptor}. @@ -127,13 +98,13 @@ public interface StreamGraph { * Multiple invocations of this method with the same {@link TableDescriptor} will throw an * {@link IllegalStateException}. * - * @param tableDesc the {@link TableDescriptor} + * @param tableDescriptor the {@link TableDescriptor} * @param <K> the type of the key * @param <V> the type of the value - * @return the {@link Table} corresponding to the {@code tableDesc} + * @return the {@link Table} corresponding to the {@code tableDescriptor} * @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor} */ - <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc); + <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor); /** * Sets the {@link ContextManager} for this {@link StreamGraph}. http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java new file mode 100644 index 0000000..a2df29a --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericInputDescriptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * A descriptor for a generic input stream. + * <p> + * An instance of this descriptor may be obtained from an appropriately configured {@link GenericSystemDescriptor}. + * <p> + * If the system being used provides its own system and stream descriptor implementations, they should be used instead. + * Otherwise, this {@link GenericInputDescriptor} may be used to provide Samza-specific properties of the input stream. + * Additional system stream specific properties may be provided using {@link #withStreamConfigs} + * <p> + * Stream properties configured using a descriptor override corresponding properties provided in configuration. + * + * @param <StreamMessageType> type of messages in this stream. + */ +public final class GenericInputDescriptor<StreamMessageType> + extends InputDescriptor<StreamMessageType, GenericInputDescriptor<StreamMessageType>> { + GenericInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) { + super(streamId, serde, systemDescriptor, null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java new file mode 100644 index 0000000..b13ac21 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericOutputDescriptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * A descriptor for a generic output stream. + * <p> + * An instance of this descriptor may be obtained from an appropriately configured {@link GenericSystemDescriptor}. + * <p> + * If the system being used provides its own system and stream descriptor implementations, they should be used instead. + * Otherwise, this {@link GenericOutputDescriptor} may be used to provide Samza-specific properties of the output stream. + * Additional system stream specific properties may be provided using {@link #withStreamConfigs} + * <p> + * Stream properties configured using a descriptor override corresponding properties provided in configuration. + * + * @param <StreamMessageType> type of messages in this stream. + */ +public final class GenericOutputDescriptor<StreamMessageType> + extends OutputDescriptor<StreamMessageType, GenericOutputDescriptor<StreamMessageType>> { + GenericOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) { + super(streamId, serde, systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java new file mode 100644 index 0000000..362745f --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/GenericSystemDescriptor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + + +import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * A descriptor for a generic system. + * <p> + * If the system being used provides its own system and stream descriptor implementations, they should be used instead. + * Otherwise, this {@link GenericSystemDescriptor} may be used to provide Samza-specific properties of the system. + * Additional system specific properties may be provided using {@link #withSystemConfigs} + * <p> + * System properties configured using a descriptor override corresponding properties provided in configuration. + */ +public final class GenericSystemDescriptor extends SystemDescriptor<GenericSystemDescriptor> + implements SimpleInputDescriptorProvider, OutputDescriptorProvider { + + /** + * Constructs a {@link GenericSystemDescriptor} instance with no system level serde. + * Serdes must be provided explicitly at stream level when getting input or output descriptors. + * + * @param systemName name of this system + * @param factoryClassName name of the SystemFactory class for this system + */ + public GenericSystemDescriptor(String systemName, String factoryClassName) { + super(systemName, factoryClassName, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor( + String streamId, Serde<StreamMessageType> serde) { + return new GenericInputDescriptor<>(streamId, this, serde); + } + + /** + * {@inheritDoc} + */ + @Override + public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor( + String streamId, Serde<StreamMessageType> serde) { + return new GenericOutputDescriptor<>(streamId, this, serde); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java new file mode 100644 index 0000000..bb38ee4 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/InputDescriptor.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.base.stream; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.SystemStreamMetadata.OffsetType; + +/** + * The base descriptor for an input stream. Allows setting properties that are common to all input streams. + * <p> + * Stream properties configured using a descriptor override corresponding properties provided in configuration. + * + * @param <StreamMessageType> type of messages in this stream. + * @param <SubClass> type of the concrete sub-class + */ +public abstract class InputDescriptor<StreamMessageType, SubClass extends InputDescriptor<StreamMessageType, SubClass>> + extends StreamDescriptor<StreamMessageType, SubClass> { + private static final String RESET_OFFSET_CONFIG_KEY = "streams.%s.samza.reset.offset"; + private static final String OFFSET_DEFAULT_CONFIG_KEY = "streams.%s.samza.offset.default"; + private static final String PRIORITY_CONFIG_KEY = "streams.%s.samza.priority"; + private static final String BOOTSTRAP_CONFIG_KEY = "streams.%s.samza.bootstrap"; + private static final String BOUNDED_CONFIG_KEY = "streams.%s.samza.bounded"; + private static final String DELETE_COMMITTED_MESSAGES_CONFIG_KEY = "streams.%s.samza.delete.committed.messages"; + + private final Optional<InputTransformer> transformerOptional; + + private Optional<Boolean> resetOffsetOptional = Optional.empty(); + private Optional<OffsetType> offsetDefaultOptional = Optional.empty(); + private Optional<Integer> priorityOptional = Optional.empty(); + private Optional<Boolean> isBootstrapOptional = Optional.empty(); + private Optional<Boolean> isBoundedOptional = Optional.empty(); + private Optional<Boolean> deleteCommittedMessagesOptional = Optional.empty(); + + /** + * Constructs an {@link InputDescriptor} instance. + * + * @param streamId id of the stream + * @param serde serde for messages in the stream + * @param systemDescriptor system descriptor this stream descriptor was obtained from + * @param transformer stream level input stream transform function if available, else null + */ + public InputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor, InputTransformer transformer) { + super(streamId, serde, systemDescriptor); + + // stream level transformer takes precedence over system level transformer + if (transformer != null) { + this.transformerOptional = Optional.of(transformer); + } else { + this.transformerOptional = systemDescriptor.getTransformer(); + } + } + + /** + * If set to true, when a Samza container starts up, it ignores any checkpointed offset for this particular + * input stream. Its behavior is thus determined by the {@link #withOffsetDefault} setting. + * Note that the reset takes effect every time a container is started, which may be every time you restart your job, + * or more frequently if a container fails and is restarted by the framework. + * + * @param resetOffset whether the container should ignore any checkpointed offset when starting + * @return this input descriptor + */ + public SubClass withResetOffset(boolean resetOffset) { + this.resetOffsetOptional = Optional.of(resetOffset); + return (SubClass) this; + } + + /** + * If a container starts up without a checkpoint, this property determines where in the input stream we should start + * consuming. The value must be an OffsetType, one of the following: + * <ul> + * <li>upcoming: Start processing messages that are published after the job starts. + * Any messages published while the job was not running are not processed. + * <li>oldest: Start processing at the oldest available message in the system, + * and reprocess the entire available message history. + * </ul> + * This property is for an individual stream. To set it for all streams within a system, see + * {@link SystemDescriptor#withDefaultStreamOffsetDefault}. If both are defined, the stream-level definition + * takes precedence. + * + * @param offsetDefault offset type to start processing from + * @return this input descriptor + */ + public SubClass withOffsetDefault(OffsetType offsetDefault) { + this.offsetDefaultOptional = Optional.ofNullable(offsetDefault); + return (SubClass) this; + } + + /** + * If one or more streams have a priority set (any positive integer), they will be processed with higher priority + * than the other streams. + * <p> + * You can set several streams to the same priority, or define multiple priority levels by assigning a + * higher number to the higher-priority streams. + * <p> + * If a higher-priority stream has any messages available, they will always be processed first; + * messages from lower-priority streams are only processed when there are no new messages on higher-priority inputs. + * + * @param priority priority for this input stream + * @return this input descriptor + */ + public SubClass withPriority(int priority) { + this.priorityOptional = Optional.of(priority); + return (SubClass) this; + } + + /** + * If set to true, this stream will be processed as a bootstrap stream. This means that every time a Samza container + * starts up, this stream will be fully consumed before messages from any other stream are processed. + * + * @param bootstrap whether this stream should be processed as a bootstrap stream + * @return this input descriptor + */ + public SubClass withBootstrap(boolean bootstrap) { + this.isBootstrapOptional = Optional.of(bootstrap); + return (SubClass) this; + } + + /** + * If set to true, this stream will be considered a bounded stream. If all input streams in an application are + * bounded, the job is considered to be running in batch processing mode. + * + * @param isBounded whether this stream is a bounded + * @return this input descriptor + */ + public SubClass withBounded(boolean isBounded) { + this.isBoundedOptional = Optional.of(isBounded); + return (SubClass) this; + } + + /** + * If set to true, and supported by the system implementation, messages older than the latest checkpointed offset + * for this stream may be deleted after the commit. + * + * @param deleteCommittedMessages whether the system should attempt to delete checkpointed messages + * @return this input descriptor + */ + public SubClass withDeleteCommittedMessages(boolean deleteCommittedMessages) { + this.deleteCommittedMessagesOptional = Optional.of(deleteCommittedMessages); + return (SubClass) this; + } + + public Optional<InputTransformer> getTransformer() { + return this.transformerOptional; + } + + @Override + public Map<String, String> toConfig() { + HashMap<String, String> configs = new HashMap<>(super.toConfig()); + String streamId = getStreamId(); + this.offsetDefaultOptional.ifPresent(od -> + configs.put(String.format(OFFSET_DEFAULT_CONFIG_KEY, streamId), od.name().toLowerCase())); + this.resetOffsetOptional.ifPresent(resetOffset -> + configs.put(String.format(RESET_OFFSET_CONFIG_KEY, streamId), Boolean.toString(resetOffset))); + this.priorityOptional.ifPresent(priority -> + configs.put(String.format(PRIORITY_CONFIG_KEY, streamId), Integer.toString(priority))); + this.isBootstrapOptional.ifPresent(bootstrap -> + configs.put(String.format(BOOTSTRAP_CONFIG_KEY, streamId), Boolean.toString(bootstrap))); + this.isBoundedOptional.ifPresent(bounded -> + configs.put(String.format(BOUNDED_CONFIG_KEY, streamId), Boolean.toString(bounded))); + this.deleteCommittedMessagesOptional.ifPresent(deleteCommittedMessages -> + configs.put(String.format(DELETE_COMMITTED_MESSAGES_CONFIG_KEY, streamId), + Boolean.toString(deleteCommittedMessages))); + return Collections.unmodifiableMap(configs); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java new file mode 100644 index 0000000..20bbc53 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/OutputDescriptor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.base.stream; + +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * The base descriptor for an output stream. Allows setting properties that are common to all output streams. + * <p> + * Stream properties configured using a descriptor override corresponding properties provided in configuration. + * + * @param <StreamMessageType> type of messages in this stream. + * @param <SubClass> type of the concrete sub-class + */ +public abstract class OutputDescriptor<StreamMessageType, SubClass extends OutputDescriptor<StreamMessageType, SubClass>> + extends StreamDescriptor<StreamMessageType, SubClass> { + /** + * Constructs an {@link OutputDescriptor} instance. + * + * @param streamId id of the stream + * @param serde serde for messages in the stream + * @param systemDescriptor system descriptor this stream descriptor was obtained from + */ + public OutputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor) { + super(streamId, serde, systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java new file mode 100644 index 0000000..e2f93db --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/stream/StreamDescriptor.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.base.stream; + +import com.google.common.base.Preconditions; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * The base descriptor for an input or output stream. Allows setting properties that are common to all streams. + * <p> + * Stream properties configured using a descriptor override corresponding properties provided in configuration. + * + * @param <StreamMessageType> type of messages in this stream. + * @param <SubClass> type of the concrete sub-class + */ +public abstract class StreamDescriptor<StreamMessageType, SubClass extends StreamDescriptor<StreamMessageType, SubClass>> { + private static final String SYSTEM_CONFIG_KEY = "streams.%s.samza.system"; + private static final String PHYSICAL_NAME_CONFIG_KEY = "streams.%s.samza.physical.name"; + private static final String STREAM_CONFIGS_CONFIG_KEY = "streams.%s.%s"; + private static final Pattern STREAM_ID_PATTERN = Pattern.compile("[\\d\\w-_]+"); + + private final String streamId; + private final Serde serde; + private final SystemDescriptor systemDescriptor; + + private final Map<String, String> streamConfigs = new HashMap<>(); + private Optional<String> physicalNameOptional = Optional.empty(); + + /** + * Constructs a {@link StreamDescriptor} instance. + * + * @param streamId id of the stream + * @param serde serde for messages in the stream + * @param systemDescriptor system descriptor this stream descriptor was obtained from + */ + StreamDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor) { + Preconditions.checkArgument(systemDescriptor != null, + String.format("SystemDescriptor must not be null. streamId: %s", streamId)); + String systemName = systemDescriptor.getSystemName(); + Preconditions.checkState(isValidStreamId(streamId), + String.format("streamId must be non-empty and must not contain spaces or special characters. " + + "streamId: %s, systemName: %s", streamId, systemName)); + Preconditions.checkArgument(serde != null, + String.format("Serde must not be null. streamId: %s systemName: %s", streamId, systemName)); + this.streamId = streamId; + this.serde = serde; + this.systemDescriptor = systemDescriptor; + } + + /** + * The physical name of the stream on the system on which this stream will be accessed. + * This is opposed to the {@code streamId} which is the logical name that Samza uses to identify the stream. + * <p> + * A physical name could be a Kafka topic name, an HDFS file URN, or any other system-specific identifier. + * <p> + * If not provided, the logical {@code streamId} is used as the physical name. + * + * @param physicalName physical name for this stream. + * @return this stream descriptor. + */ + public SubClass withPhysicalName(String physicalName) { + this.physicalNameOptional = Optional.ofNullable(physicalName); + return (SubClass) this; + } + + /** + * Additional system-specific properties for this stream. + * <p> + * These properties are added under the {@code streams.stream-id.*} scope. + * + * @param streamConfigs system-specific properties for this stream + * @return this stream descriptor + */ + public SubClass withStreamConfigs(Map<String, String> streamConfigs) { + this.streamConfigs.putAll(streamConfigs); + return (SubClass) this; + } + + public String getStreamId() { + return this.streamId; + } + + public String getSystemName() { + return this.systemDescriptor.getSystemName(); + } + + public Serde getSerde() { + return this.serde; + } + + public SystemDescriptor getSystemDescriptor() { + return this.systemDescriptor; + } + + public Optional<String> getPhysicalName() { + return physicalNameOptional; + } + + private boolean isValidStreamId(String id) { + return StringUtils.isNotBlank(id) && STREAM_ID_PATTERN.matcher(id).matches(); + } + + public Map<String, String> toConfig() { + HashMap<String, String> configs = new HashMap<>(); + configs.put(String.format(SYSTEM_CONFIG_KEY, streamId), getSystemName()); + this.physicalNameOptional.ifPresent(physicalName -> + configs.put(String.format(PHYSICAL_NAME_CONFIG_KEY, streamId), physicalName)); + this.streamConfigs.forEach((key, value) -> + configs.put(String.format(STREAM_CONFIGS_CONFIG_KEY, streamId, key), value)); + return Collections.unmodifiableMap(configs); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java new file mode 100644 index 0000000..05179dd --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/ExpandingInputDescriptorProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.base.system; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to + * their own {@code StreamExpander} function result types. + * + * @param <StreamExpanderType> type of the system level {@code StreamExpander} results + */ +public interface ExpandingInputDescriptorProvider<StreamExpanderType> { + + /** + * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided + * stream level serde, and the default system level {@code StreamExpander} + * <p> + * The type of messages in the stream is the type of messages returned by the default system level + * {@code StreamExpander} + * + * @param streamId id of the input stream + * @param serde stream level serde to be propagated to expanded input streams + * @return an {@link InputDescriptor} for the input stream + */ + InputDescriptor<StreamExpanderType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java new file mode 100644 index 0000000..c2ceb53 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/OutputDescriptorProvider.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.base.system; + +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.serializers.Serde; + + +/** + * Interface for simple {@code SystemDescriptors} that return {@code OutputDescriptors} parameterized by the type of + * the provided stream level serde. + */ +public interface OutputDescriptorProvider { + + /** + * Gets an {@link OutputDescriptor} representing an output stream on this system that uses the provided + * stream specific serde instead of the default system serde. + * <p> + * An {@code OutputStream<KV<K, V>>}, obtained using a descriptor with a {@code KVSerde<K, V>}, can send messages + * of type {@code KV<K, V>}. An {@code OutputStream<M>} with any other {@code Serde<M>} can send messages of + * type M without a key. + * <p> + * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used if the {@code SystemProducer} + * serializes the outgoing messages itself, and no prior serialization is required from the framework. + * + * @param streamId id of the output stream + * @param serde serde for this output stream that overrides the default system serde, if any. + * @param <StreamMessageType> type of messages in the output stream + * @return the {@link OutputDescriptor} for the output stream + */ + <StreamMessageType> OutputDescriptor<StreamMessageType, ? extends OutputDescriptor> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java new file mode 100644 index 0000000..de40932 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SimpleInputDescriptorProvider.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.base.system; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.serializers.Serde; + + +/** + * Interface for simple {@code SystemDescriptors} that return {@code InputDescriptors} parameterized by the type of + * the provided stream level serde. + */ +public interface SimpleInputDescriptorProvider { + + /** + * Gets an {@link InputDescriptor} for an input stream on this system. The stream has the provided + * stream level serde. + * <p> + * The type of messages in the stream is the type of the provided stream level serde. + * + * @param streamId id of the input stream + * @param serde stream level serde for the input stream + * @param <StreamMessageType> type of messages in this stream + * @return an {@link InputDescriptor} for the input stream + */ + <StreamMessageType> InputDescriptor<StreamMessageType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde<StreamMessageType> serde); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java new file mode 100644 index 0000000..4bb121d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/SystemDescriptor.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.base.system; + +import com.google.common.base.Preconditions; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.operators.functions.StreamExpander; +import org.apache.samza.system.SystemStreamMetadata.OffsetType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The base descriptor for a system. Allows setting properties that are common to all systems. + * <p> + * System properties configured using a descriptor override corresponding properties provided in configuration. + * <p> + * Systems may provide an {@link InputTransformer} to be used for input streams on the system. An + * {@link InputTransformer} transforms an {@code IncomingMessageEnvelope} with deserialized key and message + * to another message that is delivered to the {@code MessageStream}. It is applied at runtime in + * {@code InputOperatorImpl}. + * <p> + * Systems may provide a {@link StreamExpander} to be used for input streams on the system. A {@link StreamExpander} + * expands the provided {@code InputDescriptor} to a sub-DAG of one or more operators on the {@code StreamGraph}, + * and returns a new {@code MessageStream} with the combined results. It is called during graph description + * in {@code StreamGraph#getInputStream}. + * <p> + * Systems that support consuming messages from a stream should provide users means of obtaining an + * {@code InputDescriptor}. Recommended interfaces for doing so are {@link TransformingInputDescriptorProvider} for + * systems that support system level {@link InputTransformer}, {@link ExpandingInputDescriptorProvider} for systems + * that support system level {@link StreamExpander} functions, and {@link SimpleInputDescriptorProvider} otherwise. + * <p> + * Systems that support producing messages to a stream should provide users means of obtaining an + * {@code OutputDescriptor}. Recommended interface for doing so is {@link OutputDescriptorProvider}. + * <p> + * It is not required for SystemDescriptors to implement one of the Provider interfaces above. System implementers + * may choose to expose additional or alternate APIs for obtaining Input/Output Descriptors by extending + * SystemDescriptor directly. + * + * @param <SubClass> type of the concrete sub-class + */ +public abstract class SystemDescriptor<SubClass extends SystemDescriptor<SubClass>> { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemDescriptor.class); + private static final String FACTORY_CONFIG_KEY = "systems.%s.samza.factory"; + private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default"; + private static final String DEFAULT_STREAM_CONFIGS_CONFIG_KEY = "systems.%s.default.stream.%s"; + private static final String SYSTEM_CONFIGS_CONFIG_KEY = "systems.%s.%s"; + private static final Pattern SYSTEM_NAME_PATTERN = Pattern.compile("[\\d\\w-_]+"); + + private final String systemName; + private final Optional<String> factoryClassNameOptional; + private final Optional<InputTransformer> transformerOptional; + private final Optional<StreamExpander> expanderOptional; + + private final Map<String, String> systemConfigs = new HashMap<>(); + private final Map<String, String> defaultStreamConfigs = new HashMap<>(); + private Optional<OffsetType> defaultStreamOffsetDefaultOptional = Optional.empty(); + + /** + * Constructs a {@link SystemDescriptor} instance. + * + * @param systemName name of this system + * @param factoryClassName name of the SystemFactory class for this system + * @param transformer the {@link InputTransformer} for the system if any, else null + * @param expander the {@link StreamExpander} for the system if any, else null + */ + public SystemDescriptor(String systemName, String factoryClassName, InputTransformer transformer, StreamExpander expander) { + Preconditions.checkArgument(isValidSystemName(systemName), + String.format("systemName: %s must be non-empty and must not contain spaces or special characters.", systemName)); + if (StringUtils.isBlank(factoryClassName)) { + LOGGER.warn("Blank SystemFactory class name for system: {}. A value must be provided in configuration using {}.", + systemName, String.format(FACTORY_CONFIG_KEY, systemName)); + } + this.systemName = systemName; + this.factoryClassNameOptional = Optional.ofNullable(StringUtils.stripToNull(factoryClassName)); + this.transformerOptional = Optional.ofNullable(transformer); + this.expanderOptional = Optional.ofNullable(expander); + } + + /** + * If a container starts up without a checkpoint, this property determines where in the input stream we should start + * consuming. The value must be an {@link OffsetType}, one of the following: + * <ul> + * <li>upcoming: Start processing messages that are published after the job starts. + * Any messages published while the job was not running are not processed. + * <li>oldest: Start processing at the oldest available message in the system, + * and reprocess the entire available message history. + * </ul> + * This property is for all streams obtained using this system descriptor. To set it for an individual stream, + * see {@link org.apache.samza.operators.descriptors.base.stream.InputDescriptor#withOffsetDefault}. + * If both are defined, the stream-level definition takes precedence. + * + * @param offsetType offset type to start processing from + * @return this system descriptor + */ + public SubClass withDefaultStreamOffsetDefault(OffsetType offsetType) { + this.defaultStreamOffsetDefaultOptional = Optional.ofNullable(offsetType); + return (SubClass) this; + } + + /** + * Additional system-specific properties for this system. + * <p> + * These properties are added under the {@code systems.system-name.*} scope. + * + * @param systemConfigs system-specific properties for this system + * @return this system descriptor + */ + public SubClass withSystemConfigs(Map<String, String> systemConfigs) { + this.systemConfigs.putAll(systemConfigs); + return (SubClass) this; + } + + /** + * Default properties for any stream obtained using this system descriptor. + * <p> + * For example, if "systems.kafka-system.default.stream.replication.factor"=2 was configured, + * then every Kafka stream created on the kafka-system will have a replication factor of 2 + * unless the property is explicitly overridden using the stream descriptor. + * + * @param defaultStreamConfigs default stream properties + * @return this system descriptor + */ + public SubClass withDefaultStreamConfigs(Map<String, String> defaultStreamConfigs) { + this.defaultStreamConfigs.putAll(defaultStreamConfigs); + return (SubClass) this; + } + + public String getSystemName() { + return this.systemName; + } + + public Optional<InputTransformer> getTransformer() { + return this.transformerOptional; + } + + public Optional<StreamExpander> getExpander() { + return this.expanderOptional; + } + + private boolean isValidSystemName(String id) { + return StringUtils.isNotBlank(id) && SYSTEM_NAME_PATTERN.matcher(id).matches(); + } + + public Map<String, String> toConfig() { + HashMap<String, String> configs = new HashMap<>(); + this.factoryClassNameOptional.ifPresent(name -> configs.put(String.format(FACTORY_CONFIG_KEY, systemName), name)); + this.defaultStreamOffsetDefaultOptional.ifPresent(dsod -> + configs.put(String.format(DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, systemName), dsod.name().toLowerCase())); + this.defaultStreamConfigs.forEach((key, value) -> + configs.put(String.format(DEFAULT_STREAM_CONFIGS_CONFIG_KEY, getSystemName(), key), value)); + this.systemConfigs.forEach((key, value) -> + configs.put(String.format(SYSTEM_CONFIGS_CONFIG_KEY, getSystemName(), key), value)); + return configs; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java new file mode 100644 index 0000000..5b43fbd --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/descriptors/base/system/TransformingInputDescriptorProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors.base.system; + +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * Interface for advanced {@code SystemDescriptor}s that constrain the type of returned {@code InputDescriptor}s to + * their own {@code InputTransformer} function result types. + * + * @param <InputTransformerType> type of the system level {@code InputTransformer} results + */ +public interface TransformingInputDescriptorProvider<InputTransformerType> { + + /** + * Gets a {@link InputDescriptor} for an input stream on this system. The stream has the provided + * stream level serde, and the default system level {@code InputTransformer}. + * <p> + * The type of messages in the stream is the type of messages returned by the default system level + * {@code InputTransformer} + * + * @param streamId id of the input stream + * @param serde stream level serde for the input stream + * @return an {@link InputDescriptor} for the input stream + */ + InputDescriptor<InputTransformerType, ? extends InputDescriptor> getInputDescriptor(String streamId, Serde serde); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java new file mode 100644 index 0000000..704d187 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InputTransformer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import java.io.Serializable; +import org.apache.samza.system.IncomingMessageEnvelope; + +/** + * Transforms an {@link IncomingMessageEnvelope} with deserialized key and message to a message of type {@code OM} + * which is delivered to the {@code MessageStream}. Called in {@code InputOperatorImpl} when incoming messages + * from a {@code SystemConsumer} are being delivered to the application. + * <p> + * This is provided by default by transforming system descriptor implementations and can not be overridden + * or set on a per stream level. + * + * @param <OM> type of the transformed message + */ +public interface InputTransformer<OM> extends InitableFunction, ClosableFunction, Serializable { + + /** + * Transforms the provided {@link IncomingMessageEnvelope} with deserialized key and message into another message + * which is delivered to the {@code MessageStream}. + * + * @param ime the {@link IncomingMessageEnvelope} to be transformed + * @return the transformed message + */ + OM apply(IncomingMessageEnvelope ime); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java new file mode 100644 index 0000000..085a98d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import java.io.Serializable; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; + +/** + * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraph}, + * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamGraph#getInputStream} + * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an expanding system descriptor. + * <p> + * This is provided by default by expanding system descriptor implementations and can not be overridden + * or set on a per stream level. + * + * @param <OM> type of the messages in the resultant {@link MessageStream} + */ +public interface StreamExpander<OM> extends Serializable { + + /** + * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraph}, + * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor} + * is being used to get an {@link MessageStream} using {@link StreamGraph#getInputStream}. + * <p> + * Notes for system implementers: + * <p> + * Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call + * {@link StreamGraph#getInputStream} with an {@link InputDescriptor} from an expanding system descriptor + * (like this one) again. + * <p> + * It's the {@link StreamExpander}'s responsibility to propagate any properties, including serde, from the + * user-provided {@link InputDescriptor} to the expanded input descriptors. + * + * @param streamGraph the {@link StreamGraph} to register the expanded sub-DAG of operators on + * @param inputDescriptor the {@link InputDescriptor} to be expanded + * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators + */ + MessageStream<OM> apply(StreamGraph streamGraph, InputDescriptor inputDescriptor); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java new file mode 100644 index 0000000..e635338 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestExpandingInputDescriptor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + +import java.util.Collections; +import org.apache.samza.operators.descriptors.expanding.ExampleExpandingInputDescriptor; +import org.apache.samza.operators.descriptors.expanding.ExampleExpandingOutputDescriptor; +import org.apache.samza.operators.descriptors.expanding.ExampleExpandingSystemDescriptor; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.system.SystemStreamMetadata; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestExpandingInputDescriptor { + public void testAPIUsage() { + // does not assert anything, but acts as a compile-time check on expected descriptor type parameters + // and validates that the method calls can be chained. + ExampleExpandingSystemDescriptor expandingSystem = new ExampleExpandingSystemDescriptor("expandingSystem"); + ExampleExpandingInputDescriptor<Long> input1 = expandingSystem.getInputDescriptor("input1", new IntegerSerde()); + ExampleExpandingOutputDescriptor<Integer> output1 = expandingSystem.getOutputDescriptor("output1", new IntegerSerde()); + + input1 + .withBootstrap(false) + .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST) + .withPhysicalName("input-1") + .withPriority(1) + .withResetOffset(false) + .withStreamConfigs(Collections.emptyMap()); + + output1 + .withPhysicalName("output-1") + .withStreamConfigs(Collections.emptyMap()); + } + + @Test + public void testISDObjectsWithOverrides() { + ExampleExpandingSystemDescriptor expandingSystem = new ExampleExpandingSystemDescriptor("expandingSystem"); + IntegerSerde streamSerde = new IntegerSerde(); + ExampleExpandingInputDescriptor<Long> expandingISD = expandingSystem.getInputDescriptor("input-stream", streamSerde); + + assertEquals(streamSerde, expandingISD.getSerde()); + assertEquals(expandingSystem.getTransformer().get(), expandingISD.getTransformer().get()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java new file mode 100644 index 0000000..9b39d41 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericInputDescriptor.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + +import com.google.common.collect.ImmutableMap; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.serializers.DoubleSerde; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.system.SystemStreamMetadata; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class TestGenericInputDescriptor { + @Test + public void testAPIUsage() { + // does not assert anything, but acts as a compile-time check on expected descriptor type parameters + // and validates that the method calls can be chained. + GenericSystemDescriptor mySystem = + new GenericSystemDescriptor("input-system", "factory.class.name") + .withSystemConfigs(Collections.emptyMap()) + .withDefaultStreamConfigs(Collections.emptyMap()); + GenericInputDescriptor<Integer> input1 = mySystem.getInputDescriptor("input1", new IntegerSerde()); + GenericOutputDescriptor<Integer> output1 = mySystem.getOutputDescriptor("output1", new IntegerSerde()); + + input1 + .withPhysicalName("input-1") + .withBootstrap(false) + .withOffsetDefault(SystemStreamMetadata.OffsetType.NEWEST) + .withPriority(1) + .withResetOffset(false) + .withBounded(false) + .withDeleteCommittedMessages(true) + .withStreamConfigs(Collections.emptyMap()); + + output1 + .withPhysicalName("output-1") + .withStreamConfigs(Collections.emptyMap()); + } + + + @Test + public void testISDConfigsWithOverrides() { + GenericSystemDescriptor mySystem = + new GenericSystemDescriptor("input-system", "factory.class.name") + .withSystemConfigs(Collections.emptyMap()) + .withDefaultStreamConfigs(Collections.emptyMap()); + + GenericInputDescriptor<Double> isd = mySystem.getInputDescriptor("input-stream", new DoubleSerde()) + .withPhysicalName("physical-name") + .withBootstrap(true) + .withBounded(true) + .withDeleteCommittedMessages(true) + .withOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST) + .withPriority(12) + .withResetOffset(true) + .withStreamConfigs(ImmutableMap.of("custom-config-key", "custom-config-value")); + + Map<String, String> generatedConfigs = isd.toConfig(); + Map<String, String> expectedConfigs = new HashMap<>(); + expectedConfigs.put("streams.input-stream.samza.system", "input-system"); + expectedConfigs.put("streams.input-stream.samza.physical.name", "physical-name"); + expectedConfigs.put("streams.input-stream.samza.bootstrap", "true"); + expectedConfigs.put("streams.input-stream.samza.bounded", "true"); + expectedConfigs.put("streams.input-stream.samza.delete.committed.messages", "true"); + expectedConfigs.put("streams.input-stream.samza.reset.offset", "true"); + expectedConfigs.put("streams.input-stream.samza.offset.default", "oldest"); + expectedConfigs.put("streams.input-stream.samza.priority", "12"); + expectedConfigs.put("streams.input-stream.custom-config-key", "custom-config-value"); + + assertEquals(expectedConfigs, generatedConfigs); + } + + @Test + public void testISDConfigsWithDefaults() { + GenericSystemDescriptor mySystem = + new GenericSystemDescriptor("input-system", "factory.class.name") + .withSystemConfigs(Collections.emptyMap()) + .withDefaultStreamConfigs(Collections.emptyMap()); + + DoubleSerde streamSerde = new DoubleSerde(); + GenericInputDescriptor<Double> isd = mySystem.getInputDescriptor("input-stream", streamSerde); + + Map<String, String> generatedConfigs = isd.toConfig(); + Map<String, String> expectedConfigs = ImmutableMap.of("streams.input-stream.samza.system", "input-system"); + assertEquals(expectedConfigs, generatedConfigs); + assertEquals(streamSerde, isd.getSerde()); + assertFalse(isd.getTransformer().isPresent()); + } + + @Test + public void testISDObjectsWithOverrides() { + GenericSystemDescriptor mySystem = + new GenericSystemDescriptor("input-system", "factory.class.name") + .withSystemConfigs(Collections.emptyMap()) + .withDefaultStreamConfigs(Collections.emptyMap()); + IntegerSerde streamSerde = new IntegerSerde(); + GenericInputDescriptor<Integer> isd = mySystem.getInputDescriptor("input-stream", streamSerde); + + assertEquals(streamSerde, isd.getSerde()); + assertFalse(isd.getTransformer().isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericSystemDescriptor.java b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericSystemDescriptor.java new file mode 100644 index 0000000..937b56a --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/descriptors/TestGenericSystemDescriptor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.descriptors; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import org.apache.samza.system.SystemStreamMetadata; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestGenericSystemDescriptor { + @Test + public void testSDConfigs() { + GenericSystemDescriptor mySystem = + new GenericSystemDescriptor("input-system", "factory.class.name") + .withSystemConfigs(ImmutableMap.of("custom-config-key", "custom-config-value")) + .withDefaultStreamConfigs(ImmutableMap.of("custom-stream-config-key", "custom-stream-config-value")) + .withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.UPCOMING); + + Map<String, String> generatedConfigs = mySystem.toConfig(); + Map<String, String> expectedConfigs = ImmutableMap.of( + "systems.input-system.samza.factory", "factory.class.name", + "systems.input-system.custom-config-key", "custom-config-value", + "systems.input-system.default.stream.custom-stream-config-key", "custom-stream-config-value", + "systems.input-system.default.stream.samza.offset.default", "upcoming" + ); + assertEquals(expectedConfigs, generatedConfigs); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetInputDescriptorWithNullSerde() { + GenericSystemDescriptor mySystem = new GenericSystemDescriptor("input-system", "factory.class.name"); + mySystem.getInputDescriptor("streamId", null); // should throw an exception + } + + @Test(expected = IllegalArgumentException.class) + public void testGetSystemDescriptorWithNullSystemName() { + new GenericSystemDescriptor(null, "factory.class.name"); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetSystemDescriptorWithEmptySystemName() { + new GenericSystemDescriptor(" ", "factory.class.name"); + } +}