SAMZA-1659: Serializable OperatorSpec This change is to make the user supplied functions serializable. Hence, making the full user defined DAG serializable.
Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz> Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz> Reviewers: Jagadish <jvenkatra...@linkedin.com>, Prateek Maheshwari <pmahe...@linkedin.com> Closes #475 from nickpan47/serializable-opspec-only-Jan-24-18 and squashes the following commits: db0dea73 [Yi Pan (Data Infrastructure)] SAMZA-1659: fix intermittent TestZkLocalApplicationRunner failure due to StreamProcessor#stop() 34716d42 [Yi Pan (Data Infrastructure)] SAMZA-1659: fix a comment on OperatorSpec#isClone() 37d4e6ae [Yi Pan (Data Infrastructure)] SAMZA-1659: addressing latest round of review comments 68674a14 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 d3a7826c [Yi Pan (Data Infrastructure)] SAMZA-1659: addressing review comments f83e8dd0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 acca418b [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 842a73d6 [Yi Pan (Data Infrastructure)] SAMZA-1659: making user-defined functions in high-level API serializable ad85a2cb [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 c1567116 [Yi Pan (Data Infrastructure)] SAMZA-1659: Before re-merge with master. Still need to fix unit tests (moving OperatorSpec clone tests to OperatorSpecGraph.clone) f2563f8e [Yi Pan (Data Infrastructure)] SAMZA-1659: serialize the whole DAG instead of each individual OperatorSpec. 24d33496 [Yi Pan (Data Infrastructure)] SAMZA-1659: updated according to review comments. Need to merge again with master. 3f643f8b [Yi Pan (Data Infrastructure)] SAMZA-1659: serialiable OperatorSpec ed7d8c0e [Yi Pan (Data Infrastructure)] Fixed some javadoc and test files 94de218b [Yi Pan (Data Infrastructure)] Remove public access from StreamGraphImpl#getIntermediateStream(String, Serde) 8f4e9dd4 [Yi Pan (Data Infrastructure)] Serialization of StreamGraph in a wrapper class SerializedStreamGraph f3bb1958 [Yi Pan (Data Infrastructure)] Fix some comments c15246f5 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 e981967d [Yi Pan (Data Infrastructure)] WIP: fixing unit test for SamzaSQL translators w/ serialization of operator functions 40583051 [Yi Pan (Data Infrastructure)] WIP: update the serialization of user functions after the merge 18ba924f [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 93951c5f [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 54a28801 [Yi Pan (Data Infrastructure)] WIP: broadcast, sendtotable, and streamtotablejoin serialization and unit tests 45eb1fb0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 7c8d1591 [Yi Pan (Data Infrastructure)] WIP: working on unit tests for trigger, broadcast, join, table, and SQL UDF function serialization b973b105 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18 aca42308 [Yi Pan (Data Infrastructure)] WIP: Serialize OperatorSpec only w/o StreamApplication interface change. Passed all build and tests. 0ebebfc3 [Yi Pan (Data Infrastructure)] WIP: serialization only change 1670aff0 [Yi Pan (Data Infrastructure)] WIP: class-loading of user program logic and main() method based user program logic are both included in ThreadJobFactory/ProcessJobFactory/YarnJobFactory. ThreadJobFactory test suite to be fixed. 4102aa8c [Yi Pan (Data Infrastructure)] WIP: continued working on potential offspring integration dc7da87e [Yi Pan (Data Infrastructure)] WIP: unit tests for serialization 475a46bc [Yi Pan (Data Infrastructure)] WIP: fixed TestZkLocalApplicationRunner. Debugging issues w/ TestRepartitionWindowApp (i.e. missing changelog creation step when directly running LocalApplicationRunner) 6a14b2af [Yi Pan (Data Infrastructure)] WIP: fixed unit test failure for Windows d4640329 [Yi Pan (Data Infrastructure)] WIP: fixing unit tests after merge bf1ce907 [Yi Pan (Data Infrastructure)] WIP: removing StreamDescriptor first 50201728 [Yi Pan (Data Infrastructure)] Merge branch 'experiment-new-api-v2' into new-api-v2-0.14 dde1ab14 [Yi Pan (Data Infrastructure)] WIP: first end-to-end test d7df6ed0 [Yi Pan (Data Infrastructure)] WIP: added all unit test for OperatorSpec#copy methods. 6fc6d4c0 [Yi Pan (Data Infrastructure)] WIP: experiment code to implement an end-to-end working example for new APIs 525d8bc1 [Yi Pan (Data Infrastructure)] Merge branch '0.14.0' into new-api-v2 e6fb96e5 [Yi Pan (Data Infrastructure)] WIP: merged all application types into StreamApplications f227380f [Yi Pan (Data Infrastructure)] WIP: update the app runner classes 256155ad [Yi Pan (Data Infrastructure)] WIP: new API user code examples 4a6a58dc [Yi Pan (Data Infrastructure)] WIP: updated w/ low-level task API and global var ingestion/metrics reporter 3c50629e [Yi Pan (Data Infrastructure)] WIP: adding support for low-level task APIs 51541e13 [Yi Pan (Data Infrastructure)] WIP: cleanup StreamDescriptor 0bc7ee7b [Yi Pan (Data Infrastructure)] WIP: update the user code example on new APIs cd528c1c [Yi Pan (Data Infrastructure)] WIP: updated spec and user DAG API b898e6c0 [Yi Pan (Data Infrastructure)] WIP: new-api-v2 91f364f1 [Yi Pan (Data Infrastructure)] WIP: proto-type of input/output stream/system specs ae3dc6ff [Yi Pan (Data Infrastructure)] WIP: new api revision 8bb97520 [Yi Pan (Data Infrastructure)] WIP: proto-type of input/output stream/system specs 5573a069 [Yi Pan (Data Infrastructure)] WIP: new api revision aeb45730 [Xinyu Liu] SAMZA-1321: Propagate end-of-stream and watermark messages Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/53d7f262 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/53d7f262 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/53d7f262 Branch: refs/heads/master Commit: 53d7f2625145f560eb6ccc49d48dc176f244f9b3 Parents: bc4a0c2 Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Authored: Fri May 25 09:37:55 2018 -0700 Committer: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz> Committed: Fri May 25 09:37:55 2018 -0700 ---------------------------------------------------------------------- build.gradle | 5 +- .../samza/application/StreamApplication.java | 7 +- .../java/org/apache/samza/config/MapConfig.java | 7 +- .../apache/samza/operators/MessageStream.java | 19 +- .../operators/functions/ClosableFunction.java | 3 + .../operators/functions/FilterFunction.java | 3 +- .../operators/functions/FlatMapFunction.java | 3 +- .../operators/functions/FoldLeftFunction.java | 16 +- .../samza/operators/functions/JoinFunction.java | 3 +- .../samza/operators/functions/MapFunction.java | 3 +- .../samza/operators/functions/SinkFunction.java | 3 +- .../functions/StreamTableJoinFunction.java | 3 +- .../operators/functions/SupplierFunction.java | 38 ++ .../samza/operators/triggers/AnyTrigger.java | 10 +- .../samza/operators/triggers/Trigger.java | 3 +- .../apache/samza/operators/windows/Window.java | 3 +- .../apache/samza/operators/windows/Windows.java | 53 +- .../windows/internal/WindowInternal.java | 32 +- .../samza/serializers/SerializableSerde.java | 2 +- .../org/apache/samza/system/StreamSpec.java | 3 +- .../samza/system/SystemStreamPartition.java | 4 +- .../java/org/apache/samza/table/TableSpec.java | 12 +- .../samza/operators/windows/TestWindowPane.java | 2 +- .../samza/execution/ExecutionPlanner.java | 28 +- .../org/apache/samza/execution/JobGraph.java | 14 +- .../samza/execution/JobGraphJsonGenerator.java | 4 +- .../org/apache/samza/execution/JobNode.java | 26 +- .../samza/operators/MessageStreamImpl.java | 79 ++- .../samza/operators/OperatorSpecGraph.java | 132 ++++ .../apache/samza/operators/StreamGraphImpl.java | 328 ---------- .../apache/samza/operators/StreamGraphSpec.java | 299 +++++++++ .../org/apache/samza/operators/TableImpl.java | 3 +- .../operators/impl/BroadcastOperatorImpl.java | 2 +- .../samza/operators/impl/OperatorImpl.java | 2 +- .../samza/operators/impl/OperatorImplGraph.java | 73 ++- .../operators/impl/OutputOperatorImpl.java | 5 +- .../operators/impl/PartitionByOperatorImpl.java | 16 +- .../operators/impl/StreamOperatorImpl.java | 3 +- .../operators/impl/WindowOperatorImpl.java | 23 +- .../operators/spec/FilterOperatorSpec.java | 74 +++ .../operators/spec/FlatMapOperatorSpec.java | 47 ++ .../samza/operators/spec/InputOperatorSpec.java | 13 +- .../samza/operators/spec/JoinOperatorSpec.java | 17 +- .../samza/operators/spec/MapOperatorSpec.java | 77 +++ .../samza/operators/spec/MergeOperatorSpec.java | 51 ++ .../samza/operators/spec/OperatorSpec.java | 23 +- .../samza/operators/spec/OperatorSpecs.java | 73 +-- .../samza/operators/spec/OutputStreamImpl.java | 17 +- .../operators/spec/PartitionByOperatorSpec.java | 23 +- .../operators/spec/SendToTableOperatorSpec.java | 9 +- .../operators/spec/StreamOperatorSpec.java | 23 +- .../operators/spec/WindowOperatorSpec.java | 11 + .../stream/IntermediateMessageStreamImpl.java | 4 +- .../samza/operators/triggers/Cancellable.java | 2 +- .../samza/operators/triggers/TriggerImpl.java | 6 +- .../runtime/AbstractApplicationRunner.java | 21 +- .../samza/runtime/LocalApplicationRunner.java | 28 +- .../samza/runtime/LocalContainerRunner.java | 11 +- .../apache/samza/task/StreamOperatorTask.java | 48 +- .../org/apache/samza/task/TaskFactoryUtil.java | 26 +- .../apache/samza/container/TaskInstance.scala | 1 - .../samza/job/local/ThreadJobFactory.scala | 11 +- .../apache/samza/example/BroadcastExample.java | 71 --- .../samza/example/KeyValueStoreExample.java | 131 ---- .../org/apache/samza/example/MergeExample.java | 60 -- .../samza/example/OrderShipmentJoinExample.java | 115 ---- .../samza/example/PageViewCounterExample.java | 95 --- .../samza/example/RepartitionExample.java | 90 --- .../org/apache/samza/example/WindowExample.java | 81 --- .../samza/execution/TestExecutionPlanner.java | 117 ++-- .../apache/samza/execution/TestJobGraph.java | 68 +-- .../execution/TestJobGraphJsonGenerator.java | 32 +- .../org/apache/samza/execution/TestJobNode.java | 14 +- .../samza/operators/TestJoinOperator.java | 152 ++--- .../samza/operators/TestMessageStreamImpl.java | 55 +- .../samza/operators/TestOperatorSpecGraph.java | 185 ++++++ .../samza/operators/TestStreamGraphImpl.java | 601 ------------------- .../samza/operators/TestStreamGraphSpec.java | 601 +++++++++++++++++++ .../data/TestOutputMessageEnvelope.java | 14 + .../operators/impl/TestOperatorImplGraph.java | 298 ++++++--- .../operators/impl/TestStreamOperatorImpl.java | 4 +- .../operators/impl/TestWindowOperator.java | 263 ++++---- .../operators/spec/OperatorSpecTestUtils.java | 141 +++++ .../samza/operators/spec/TestOperatorSpec.java | 465 ++++++++++++++ .../spec/TestPartitionByOperatorSpec.java | 165 +++++ .../operators/spec/TestWindowOperatorSpec.java | 306 +++++++++- .../runtime/TestAbstractApplicationRunner.java | 36 +- .../runtime/TestLocalApplicationRunner.java | 21 +- .../apache/samza/task/IdentityStreamTask.java | 55 ++ .../apache/samza/task/TestTaskFactoryUtil.java | 64 +- .../testUtils/InvalidStreamApplication.java | 25 - .../samza/system/kafka/TestKafkaStreamSpec.java | 3 +- .../samza/sql/data/SamzaSqlCompositeKey.java | 1 + .../sql/data/SamzaSqlExecutionContext.java | 20 +- .../samza/sql/translator/FilterTranslator.java | 47 +- .../translator/LogicalAggregateTranslator.java | 24 +- .../samza/sql/translator/ProjectTranslator.java | 60 +- .../samza/sql/translator/QueryTranslator.java | 46 +- .../SamzaSqlRelMessageJoinFunction.java | 12 +- .../samza/sql/translator/ScanTranslator.java | 28 +- .../samza/sql/translator/TranslatorContext.java | 79 ++- .../apache/samza/sql/TestQueryTranslator.java | 510 ---------------- .../sql/TestSamzaSqlApplicationConfig.java | 95 --- .../sql/TestSamzaSqlApplicationRunner.java | 56 -- .../samza/sql/TestSamzaSqlFileParser.java | 58 -- .../samza/sql/TestSamzaSqlQueryParser.java | 76 --- .../samza/sql/TestSamzaSqlRelMessage.java | 46 -- .../sql/TestSamzaSqlRelMessageJoinFunction.java | 119 ---- .../samza/sql/data/TestSamzaSqlRelMessage.java | 46 ++ .../runner/TestSamzaSqlApplicationConfig.java | 95 +++ .../runner/TestSamzaSqlApplicationRunner.java | 56 ++ .../sql/testutil/TestSamzaSqlFileParser.java | 58 ++ .../sql/testutil/TestSamzaSqlQueryParser.java | 75 +++ .../sql/translator/TestFilterTranslator.java | 136 +++++ .../sql/translator/TestJoinTranslator.java | 191 ++++++ .../sql/translator/TestProjectTranslator.java | 289 +++++++++ .../sql/translator/TestQueryTranslator.java | 596 ++++++++++++++++++ .../TestSamzaSqlRelMessageJoinFunction.java | 118 ++++ .../sql/translator/TranslatorTestBase.java | 72 +++ .../example/AppWithGlobalConfigExample.java | 86 +++ .../apache/samza/example/BroadcastExample.java | 70 +++ .../samza/example/KeyValueStoreExample.java | 138 +++++ .../org/apache/samza/example/MergeExample.java | 62 ++ .../samza/example/OrderShipmentJoinExample.java | 121 ++++ .../samza/example/PageViewCounterExample.java | 100 +++ .../samza/example/RepartitionExample.java | 96 +++ .../org/apache/samza/example/WindowExample.java | 86 +++ .../samza/test/framework/StreamAssert.java | 14 + .../EndOfStreamIntegrationTest.java | 8 +- .../WatermarkIntegrationTest.java | 7 +- .../test/operator/RepartitionJoinWindowApp.java | 54 +- .../test/operator/RepartitionWindowApp.java | 72 +++ .../samza/test/operator/SessionWindowApp.java | 21 +- .../operator/TestRepartitionJoinWindowApp.java | 2 +- .../test/operator/TestRepartitionWindowApp.java | 90 +++ .../samza/test/operator/TumblingWindowApp.java | 20 +- .../samza/test/operator/data/PageView.java | 3 +- .../test/processor/SharedContextFactories.java | 117 ++++ .../test/processor/TestStreamApplication.java | 148 +++++ .../processor/TestZkLocalApplicationRunner.java | 211 +++---- .../apache/samza/test/table/TestLocalTable.java | 243 +++++--- .../samza/test/table/TestRemoteTable.java | 37 +- .../apache/samza/test/timer/TestTimerApp.java | 5 +- 143 files changed, 7227 insertions(+), 3811 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 6872354..a94fcfa 100644 --- a/build.gradle +++ b/build.gradle @@ -325,6 +325,9 @@ project(':samza-sql') { testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" + testCompile "org.powermock:powermock-api-mockito:$powerMockVersion" + testCompile "org.powermock:powermock-core:$powerMockVersion" + testCompile "org.powermock:powermock-module-junit4:$powerMockVersion" testRuntime "org.slf4j:slf4j-simple:$slf4jVersion" } @@ -756,10 +759,10 @@ project(":samza-test_$scalaVersion") { compile project(":samza-kv-inmemory_$scalaVersion") compile project(":samza-kv-rocksdb_$scalaVersion") compile project(":samza-core_$scalaVersion") + compile project(":samza-kafka_$scalaVersion") compile project(":samza-sql") runtime project(":samza-log4j") runtime project(":samza-yarn_$scalaVersion") - runtime project(":samza-kafka_$scalaVersion") runtime project(":samza-hdfs_$scalaVersion") compile "org.scala-lang:scala-library:$scalaLibVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java index f615207..0b2142b 100644 --- a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java +++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java @@ -61,9 +61,10 @@ import org.apache.samza.task.TaskContext; * * <p> * Implementation Notes: Currently StreamApplications are wrapped in a {@link StreamTask} during execution. - * A new StreamApplication instance will be created and initialized when planning the execution, as well as for each - * {@link StreamTask} instance used for processing incoming messages. Execution is synchronous and thread-safe within - * each {@link StreamTask}. + * A new StreamApplication instance will be created and initialized with a user-defined {@link StreamGraph} + * when planning the execution. The {@link StreamGraph} and the functions implemented for transforms are required to + * be serializable. The execution planner will generate a serialized DAG which will be deserialized in each {@link StreamTask} + * instance used for processing incoming messages. Execution is synchronous and thread-safe within each {@link StreamTask}. * * <p> * Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction}, http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/config/MapConfig.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java index 0b1ed98..5af2535 100644 --- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java +++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java @@ -43,8 +43,11 @@ public class MapConfig extends Config { public MapConfig(List<Map<String, String>> maps) { this.map = new HashMap<>(); - for (Map<String, String> m: maps) - this.map.putAll(m); + for (Map<String, String> m: maps) { + if (m != null) { + this.map.putAll(m); + } + } } public MapConfig(Map<String, String>... maps) { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index 98f0784..7797f9a 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -21,7 +21,6 @@ package org.apache.samza.operators; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.function.Function; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.functions.FilterFunction; @@ -237,34 +236,34 @@ public interface MessageStream<M> { * <p> * Unlike {@link #sendTo}, messages with a null key are all sent to partition 0. * - * @param keyExtractor the {@link Function} to extract the message and partition key from the input message. + * @param keyExtractor the {@link MapFunction} to extract the message and partition key from the input message. * Messages with a null key are all sent to partition 0. - * @param valueExtractor the {@link Function} to extract the value from the input message + * @param valueExtractor the {@link MapFunction} to extract the value from the input message * @param serde the {@link KVSerde} to use for (de)serializing the key and value. * @param id the unique id of this operator in this application * @param <K> the type of output key * @param <V> the type of output value * @return the repartitioned {@link MessageStream} */ - <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, - Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id); + <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor, + MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id); /** - * Same as calling {@link #partitionBy(Function, Function, KVSerde, String)} with a null KVSerde. + * Same as calling {@link #partitionBy(MapFunction, MapFunction, KVSerde, String)} with a null KVSerde. * <p> * Uses the default serde provided via {@link StreamGraph#setDefaultSerde}, which must be a KVSerde. If the default * serde is not a {@link KVSerde}, a runtime exception will be thrown. If no default serde has been provided * <b>before</b> calling this method, a {@code KVSerde<NoOpSerde, NoOpSerde>} is used. * - * @param keyExtractor the {@link Function} to extract the message and partition key from the input message - * @param valueExtractor the {@link Function} to extract the value from the input message + * @param keyExtractor the {@link MapFunction} to extract the message and partition key from the input message + * @param valueExtractor the {@link MapFunction} to extract the value from the input message * @param id the unique id of this operator in this application * @param <K> the type of output key * @param <V> the type of output value * @return the repartitioned {@link MessageStream} */ - <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, - Function<? super M, ? extends V> valueExtractor, String id); + <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor, + MapFunction<? super M, ? extends V> valueExtractor, String id); /** * Sends messages in this {@link MessageStream} to a {@link Table}. The type of input message is expected http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java index ea83ba4..faf9fc5 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java @@ -33,5 +33,8 @@ import org.apache.samza.annotation.InterfaceStability; */ @InterfaceStability.Unstable public interface ClosableFunction { + /** + * Frees any resource acquired by the operators in {@link InitableFunction} + */ default void close() {} } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java index 31bbbd8..ce68e0f 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.functions; +import java.io.Serializable; import org.apache.samza.annotation.InterfaceStability; @@ -28,7 +29,7 @@ import org.apache.samza.annotation.InterfaceStability; */ @InterfaceStability.Unstable @FunctionalInterface -public interface FilterFunction<M> extends InitableFunction, ClosableFunction { +public interface FilterFunction<M> extends InitableFunction, ClosableFunction, Serializable { /** * Returns a boolean indicating whether this message should be retained or filtered out. http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java index 7e9253e..63d7061 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.functions; +import java.io.Serializable; import org.apache.samza.annotation.InterfaceStability; import java.util.Collection; @@ -31,7 +32,7 @@ import java.util.Collection; */ @InterfaceStability.Unstable @FunctionalInterface -public interface FlatMapFunction<M, OM> extends InitableFunction, ClosableFunction { +public interface FlatMapFunction<M, OM> extends InitableFunction, ClosableFunction, Serializable { /** * Transforms the provided message into a collection of 0 or more messages. http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java index 78250e3..d6ba205 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java @@ -19,16 +19,22 @@ package org.apache.samza.operators.functions; +import java.io.Serializable; +import org.apache.samza.annotation.InterfaceStability; + + /** - * Incrementally updates the window value as messages are added to the window. + * Incrementally updates the aggregated value as messages are added. Main usage is in {@link org.apache.samza.operators.windows.Window} operator. */ -public interface FoldLeftFunction<M, WV> extends InitableFunction, ClosableFunction { +@InterfaceStability.Unstable +@FunctionalInterface +public interface FoldLeftFunction<M, WV> extends InitableFunction, ClosableFunction, Serializable { /** - * Incrementally updates the window value as messages are added to the window. + * Incrementally updates the aggregated value as messages are added. * - * @param message the message being added to the window - * @param oldValue the previous value associated with the window + * @param message the message being added to the aggregated value + * @param oldValue the previous value * @return the new value */ WV apply(M message, WV oldValue); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java index 954083d..94a998d 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.functions; +import java.io.Serializable; import org.apache.samza.annotation.InterfaceStability; @@ -30,7 +31,7 @@ import org.apache.samza.annotation.InterfaceStability; * @param <RM> type of the joined message */ @InterfaceStability.Unstable -public interface JoinFunction<K, M, JM, RM> extends InitableFunction, ClosableFunction { +public interface JoinFunction<K, M, JM, RM> extends InitableFunction, ClosableFunction, Serializable { /** * Joins the provided messages and returns the joined message. http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java index a8c139f..fad9cf8 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.functions; +import java.io.Serializable; import org.apache.samza.annotation.InterfaceStability; @@ -29,7 +30,7 @@ import org.apache.samza.annotation.InterfaceStability; */ @InterfaceStability.Unstable @FunctionalInterface -public interface MapFunction<M, OM> extends InitableFunction, ClosableFunction { +public interface MapFunction<M, OM> extends InitableFunction, ClosableFunction, Serializable { /** * Transforms the provided message into another message. http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java index e290d7d..2b44125 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.functions; +import java.io.Serializable; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; @@ -30,7 +31,7 @@ import org.apache.samza.task.TaskCoordinator; */ @InterfaceStability.Unstable @FunctionalInterface -public interface SinkFunction<M> extends InitableFunction, ClosableFunction { +public interface SinkFunction<M> extends InitableFunction, ClosableFunction, Serializable { /** * Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java index 6afcf67..356e07f 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.functions; +import java.io.Serializable; import org.apache.samza.annotation.InterfaceStability; @@ -30,7 +31,7 @@ import org.apache.samza.annotation.InterfaceStability; * @param <JM> type of join results */ @InterfaceStability.Unstable -public interface StreamTableJoinFunction<K, M, R, JM> extends InitableFunction, ClosableFunction { +public interface StreamTableJoinFunction<K, M, R, JM> extends InitableFunction, ClosableFunction, Serializable { /** * Joins the provided messages and table record, returns the joined message. http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java new file mode 100644 index 0000000..155fb0e --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SupplierFunction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import java.io.Serializable; +import org.apache.samza.annotation.InterfaceStability; + + +/** + * A supplier to return a new value at each invocation + */ +@InterfaceStability.Unstable +@FunctionalInterface +public interface SupplierFunction<T> extends InitableFunction, ClosableFunction, Serializable { + + /** + * Returns a value of type T + * + * @return a value for type T + */ + T get(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java index f52b57b..6bdf406 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java @@ -18,20 +18,24 @@ */ package org.apache.samza.operators.triggers; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; + /** * A {@link Trigger} fires as soon as any of its individual triggers has fired. */ public class AnyTrigger<M> implements Trigger<M> { - private final List<Trigger<M>> triggers; + private final ArrayList<Trigger<M>> triggers; AnyTrigger(List<Trigger<M>> triggers) { - this.triggers = triggers; + this.triggers = new ArrayList<>(); + this.triggers.addAll(triggers); } public List<Trigger<M>> getTriggers() { - return triggers; + return Collections.unmodifiableList(triggers); } } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java index be0a877..f224fa2 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.triggers; +import java.io.Serializable; import org.apache.samza.annotation.InterfaceStability; /** @@ -30,6 +31,6 @@ import org.apache.samza.annotation.InterfaceStability; * @param <M> the type of the incoming message */ @InterfaceStability.Unstable -public interface Trigger<M> { +public interface Trigger<M> extends Serializable { } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java index 1c0fa53..7534fca 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.windows; +import java.io.Serializable; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.triggers.Trigger; @@ -70,7 +71,7 @@ import org.apache.samza.operators.triggers.Trigger; * @param <WV> the type of the value in the window */ @InterfaceStability.Unstable -public interface Window<M, K, WV> { +public interface Window<M, K, WV> extends Serializable { /** * Set the early triggers for this {@link Window}. http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index 50391ff..4805a0e 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -21,6 +21,8 @@ package org.apache.samza.operators.windows; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.triggers.TimeTrigger; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; @@ -30,8 +32,6 @@ import org.apache.samza.serializers.Serde; import java.time.Duration; import java.util.Collection; -import java.util.function.Function; -import java.util.function.Supplier; /** * APIs for creating different types of {@link Window}s. @@ -84,7 +84,7 @@ import java.util.function.Supplier; * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants * of the window types above. * - * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link Supplier} + * <p> The value for the window can be updated incrementally by providing an {@code initialValue} {@link SupplierFunction} * and an aggregating {@link FoldLeftFunction}. The initial value supplier is invoked every time a new window is * created. The aggregating function is invoked for each incoming message for the window. If these are not provided, * the emitted {@link WindowPane} will contain a collection of messages in the window. @@ -105,8 +105,8 @@ public final class Windows { * * <pre> {@code * MessageStream<UserClick> stream = ...; - * Function<UserClick, String> keyFn = ...; - * Supplier<Integer> initialValue = () -> 0; + * MapFunction<UserClick, String> keyFn = ...; + * SupplierFunction<Integer> initialValue = () -> 0; * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c); * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window( * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator)); @@ -125,16 +125,15 @@ public final class Windows { * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function. */ - public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<? super M, ? extends K> keyFn, Duration interval, - Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde, + public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(MapFunction<? super M, ? extends K> keyFn, Duration interval, + SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde, Serde<WV> windowValueSerde) { Trigger<M> defaultTrigger = new TimeTrigger<>(interval); - return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, - (Function<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, windowValueSerde, null); + return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, + (MapFunction<M, K>) keyFn, null, WindowType.TUMBLING, keySerde, windowValueSerde, null); } - /** * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping * processing time based windows using the provided keyFn. @@ -157,12 +156,12 @@ public final class Windows { * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval, + public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(MapFunction<M, K> keyFn, Duration interval, Serde<K> keySerde, Serde<M> msgSerde) { Trigger<M> defaultTrigger = new TimeTrigger<>(interval); - return new WindowInternal<>(defaultTrigger, null, null, keyFn, null, - WindowType.TUMBLING, keySerde, null, msgSerde); + return new WindowInternal<>(defaultTrigger, null, null, keyFn, null, WindowType.TUMBLING, + keySerde, null, msgSerde); } /** @@ -173,7 +172,7 @@ public final class Windows { * * <pre> {@code * MessageStream<String> stream = ...; - * Supplier<Integer> initialValue = () -> 0; + * SupplierFunction<Integer> initialValue = () -> 0; * FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c); * MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window( * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator)); @@ -189,10 +188,10 @@ public final class Windows { * @param <WV> the type of the {@link WindowPane} output value * @return the created {@link Window} function */ - public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, Supplier<? extends WV> initialValue, + public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration interval, SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<WV> windowValueSerde) { Trigger<M> defaultTrigger = new TimeTrigger<>(interval); - return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, + return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, null, null, WindowType.TUMBLING, null, windowValueSerde, null); } @@ -221,9 +220,8 @@ public final class Windows { */ public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration, Serde<M> msgSerde) { Trigger<M> defaultTrigger = new TimeTrigger<>(duration); - - return new WindowInternal<>(defaultTrigger, null, null, null, - null, WindowType.TUMBLING, null, null, msgSerde); + return new WindowInternal<>(defaultTrigger, null, null, null, null, + WindowType.TUMBLING, null, null, msgSerde); } /** @@ -238,7 +236,7 @@ public final class Windows { * * <pre> {@code * MessageStream<UserClick> stream = ...; - * Supplier<Integer> initialValue = () -> 0; + * SupplierFunction<Integer> initialValue = () -> 0; * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c); * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..; * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window( @@ -258,12 +256,12 @@ public final class Windows { * @param <WV> the type of the output value in the {@link WindowPane} * @return the created {@link Window} function */ - public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<? super M, ? extends K> keyFn, - Duration sessionGap, Supplier<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, + public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(MapFunction<? super M, ? extends K> keyFn, + Duration sessionGap, SupplierFunction<? extends WV> initialValue, FoldLeftFunction<? super M, WV> aggregator, Serde<K> keySerde, Serde<WV> windowValueSerde) { Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); - return new WindowInternal<>(defaultTrigger, (Supplier<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, - (Function<M, K>) keyFn, null, WindowType.SESSION, keySerde, windowValueSerde, null); + return new WindowInternal<>(defaultTrigger, (SupplierFunction<WV>) initialValue, (FoldLeftFunction<M, WV>) aggregator, + (MapFunction<M, K>) keyFn, null, WindowType.SESSION, keySerde, windowValueSerde, null); } /** @@ -278,7 +276,7 @@ public final class Windows { * * <pre> {@code * MessageStream<UserClick> stream = ...; - * Supplier<Integer> initialValue = () -> 0; + * SupplierFunction<Integer> initialValue = () -> 0; * FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c); * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..; * MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window( @@ -294,11 +292,10 @@ public final class Windows { * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<? super M, ? extends K> keyFn, + public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(MapFunction<? super M, ? extends K> keyFn, Duration sessionGap, Serde<K> keySerde, Serde<M> msgSerde) { - Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); - return new WindowInternal<>(defaultTrigger, null, null, (Function<M, K>) keyFn, + return new WindowInternal<>(defaultTrigger, null, null, (MapFunction<M, K>) keyFn, null, WindowType.SESSION, keySerde, null, msgSerde); } } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java index bc71872..ff19aba 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java @@ -18,14 +18,14 @@ */ package org.apache.samza.operators.windows.internal; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.Window; import org.apache.samza.serializers.Serde; -import java.util.function.Function; -import java.util.function.Supplier; /** * Internal representation of a {@link Window}. This specifies default, early and late triggers for the {@link Window} @@ -45,7 +45,7 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> { /** * The supplier of initial value to be used for windowed aggregations */ - private final Supplier<WV> initializer; + private final SupplierFunction<WV> initializer; /* * The function that is applied each time a {@link MessageEnvelope} is added to this window. @@ -55,28 +55,32 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> { /* * The function that extracts the key from a {@link MessageEnvelope} */ - private final Function<M, WK> keyExtractor; + private final MapFunction<M, WK> keyExtractor; /* * The function that extracts the event time from a {@link MessageEnvelope} */ - private final Function<M, Long> eventTimeExtractor; + private final MapFunction<M, Long> eventTimeExtractor; /** * The type of this window. Tumbling and Session windows are supported for now. */ private final WindowType windowType; - private final Serde<WK> keySerde; - private final Serde<WV> windowValSerde; - private final Serde<M> msgSerde; - private Trigger<M> earlyTrigger; private Trigger<M> lateTrigger; private AccumulationMode mode; - public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initializer, FoldLeftFunction<M, WV> foldLeftFunction, - Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, WindowType windowType, Serde<WK> keySerde, + /** + * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the store configs, and deserialized + * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis + */ + private transient final Serde<WK> keySerde; + private transient final Serde<WV> windowValSerde; + private transient final Serde<M> msgSerde; + + public WindowInternal(Trigger<M> defaultTrigger, SupplierFunction<WV> initializer, FoldLeftFunction<M, WV> foldLeftFunction, + MapFunction<M, WK> keyExtractor, MapFunction<M, Long> eventTimeExtractor, WindowType windowType, Serde<WK> keySerde, Serde<WV> windowValueSerde, Serde<M> msgSerde) { this.defaultTrigger = defaultTrigger; this.initializer = initializer; @@ -121,7 +125,7 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> { return lateTrigger; } - public Supplier<WV> getInitializer() { + public SupplierFunction<WV> getInitializer() { return initializer; } @@ -129,11 +133,11 @@ public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> { return foldLeftFunction; } - public Function<M, WK> getKeyExtractor() { + public MapFunction<M, WK> getKeyExtractor() { return keyExtractor; } - public Function<M, Long> getEventTimeExtractor() { + public MapFunction<M, Long> getEventTimeExtractor() { return eventTimeExtractor; } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java index d70746c..d49518c 100644 --- a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java +++ b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java @@ -68,7 +68,7 @@ public class SerializableSerde<T extends Serializable> implements Serde<T> { ois = new ObjectInputStream(bis); return (T) ois.readObject(); } catch (IOException | ClassNotFoundException e) { - throw new SamzaException("Error reading from input stream."); + throw new SamzaException("Error reading from input stream.", e); } finally { try { if (ois != null) { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java index ce67d8d..cd86426 100644 --- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.system; +import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -33,7 +34,7 @@ import java.util.Map; * * It is immutable by design. */ -public class StreamSpec { +public class StreamSpec implements Serializable { private static final int DEFAULT_PARTITION_COUNT = 1; http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java index 95cc266..e9ca9f7 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartition.java @@ -60,7 +60,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy public Partition getPartition() { return partition; } - + public SystemStream getSystemStream() { return new SystemStream(system, stream); } @@ -69,7 +69,7 @@ public class SystemStreamPartition extends SystemStream implements Comparable<Sy public int hashCode() { return hash; } - + private int computeHashCode() { final int prime = 31; int result = super.hashCode(); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/main/java/org/apache/samza/table/TableSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java index 68043f9..ba57c2f 100644 --- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java +++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java @@ -18,6 +18,7 @@ */ package org.apache.samza.table; +import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -41,12 +42,17 @@ import org.apache.samza.serializers.KVSerde; * It is immutable by design. */ @InterfaceStability.Unstable -public class TableSpec { +public class TableSpec implements Serializable { private final String id; - private final KVSerde serde; private final String tableProviderFactoryClassName; - private final Map<String, String> config = new HashMap<>(); + + /** + * The following fields are serialized by the ExecutionPlanner when generating the configs for a table, and deserialized + * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis + */ + private transient final KVSerde serde; + private transient final Map<String, String> config = new HashMap<>(); /** * Default constructor http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java index 4184c9d..19cce6f 100644 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java @@ -27,7 +27,7 @@ import static org.junit.Assert.assertEquals; public class TestWindowPane { @Test public void testConstructor() { - WindowPane<String, Integer> wndOutput = new WindowPane(new WindowKey("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY); + WindowPane<String, Integer> wndOutput = new WindowPane<>(new WindowKey<>("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY); assertEquals(wndOutput.getKey().getKey(), "testMsg"); assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index e2c122a..9d8bd5f 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -34,7 +34,7 @@ import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.system.StreamSpec; @@ -61,18 +61,18 @@ public class ExecutionPlanner { this.streamManager = streamManager; } - public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception { + public ExecutionPlan plan(OperatorSpecGraph specGraph) throws Exception { validateConfig(); // create physical job graph based on stream graph - JobGraph jobGraph = createJobGraph(streamGraph); + JobGraph jobGraph = createJobGraph(specGraph); // fetch the external streams partition info updateExistingPartitions(jobGraph, streamManager); if (!jobGraph.getIntermediateStreamEdges().isEmpty()) { // figure out the partitions for internal streams - calculatePartitions(streamGraph, jobGraph); + calculatePartitions(jobGraph); } return jobGraph; @@ -91,12 +91,12 @@ public class ExecutionPlanner { /** * Create the physical graph from StreamGraph */ - /* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) { - JobGraph jobGraph = new JobGraph(config); - Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet()); - Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet()); + /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) { + JobGraph jobGraph = new JobGraph(config, specGraph); + Set<StreamSpec> sourceStreams = new HashSet<>(specGraph.getInputOperators().keySet()); + Set<StreamSpec> sinkStreams = new HashSet<>(specGraph.getOutputStreams().keySet()); Set<StreamSpec> intStreams = new HashSet<>(sourceStreams); - Set<TableSpec> tables = new HashSet<>(streamGraph.getTables().keySet()); + Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet()); intStreams.retainAll(sinkStreams); sourceStreams.removeAll(intStreams); sinkStreams.removeAll(intStreams); @@ -104,7 +104,7 @@ public class ExecutionPlanner { // For this phase, we have a single job node for the whole dag String jobName = config.get(JobConfig.JOB_NAME()); String jobId = config.get(JobConfig.JOB_ID(), "1"); - JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph); + JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId); // add sources sourceStreams.forEach(spec -> jobGraph.addSource(spec, node)); @@ -126,9 +126,9 @@ public class ExecutionPlanner { /** * Figure out the number of partitions of all streams */ - /* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) { + /* package private */ void calculatePartitions(JobGraph jobGraph) { // calculate the partitions for the input streams of join operators - calculateJoinInputPartitions(streamGraph, jobGraph); + calculateJoinInputPartitions(jobGraph); // calculate the partitions for the rest of intermediate streams calculateIntStreamPartitions(jobGraph, config); @@ -172,7 +172,7 @@ public class ExecutionPlanner { /** * Calculate the partitions for the input streams of join operators */ - /* package private */ static void calculateJoinInputPartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) { + /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph) { // mapping from a source stream to all join specs reachable from it Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create(); // reverse mapping of the above @@ -182,7 +182,7 @@ public class ExecutionPlanner { // The visited set keeps track of the join specs that have been already inserted in the queue before Set<OperatorSpec> visited = new HashSet<>(); - streamGraph.getInputOperators().entrySet().forEach(entry -> { + jobGraph.getSpecGraph().getInputOperators().entrySet().forEach(entry -> { StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey()); // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java index abd3ce7..843db85 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java @@ -34,7 +34,7 @@ import java.util.stream.Collectors; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; import org.slf4j.Logger; @@ -60,13 +60,15 @@ import org.slf4j.LoggerFactory; private final Set<TableSpec> tables = new HashSet<>(); private final Config config; private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator(); + private final OperatorSpecGraph specGraph; /** * The JobGraph is only constructed by the {@link ExecutionPlanner}. * @param config Config */ - JobGraph(Config config) { + JobGraph(Config config, OperatorSpecGraph specGraph) { this.config = config; + this.specGraph = specGraph; } @Override @@ -107,6 +109,10 @@ import org.slf4j.LoggerFactory; return new ApplicationConfig(config); } + public OperatorSpecGraph getSpecGraph() { + return specGraph; + } + /** * Add a source stream to a {@link JobNode} * @param input source stream @@ -152,11 +158,11 @@ import org.slf4j.LoggerFactory; * @param jobId id of the job * @return */ - JobNode getOrCreateJobNode(String jobName, String jobId, StreamGraphImpl streamGraph) { + JobNode getOrCreateJobNode(String jobName, String jobId) { String nodeId = JobNode.createId(jobName, jobId); JobNode node = nodes.get(nodeId); if (node == null) { - node = new JobNode(jobName, jobId, streamGraph, config); + node = new JobNode(jobName, jobId, specGraph, config); nodes.put(nodeId, node); } return node; http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java index 48d2219..298042b 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java @@ -170,7 +170,7 @@ import org.codehaus.jackson.map.ObjectMapper; private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) { OperatorGraphJson opGraph = new OperatorGraphJson(); opGraph.inputStreams = new ArrayList<>(); - jobNode.getStreamGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> { + jobNode.getSpecGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> { StreamJson inputJson = new StreamJson(); opGraph.inputStreams.add(inputJson); inputJson.streamId = streamSpec.getId(); @@ -181,7 +181,7 @@ import org.codehaus.jackson.map.ObjectMapper; }); opGraph.outputStreams = new ArrayList<>(); - jobNode.getStreamGraph().getOutputStreams().keySet().forEach(streamSpec -> { + jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamSpec -> { StreamJson outputJson = new StreamJson(); outputJson.streamId = streamSpec.getId(); opGraph.outputStreams.add(outputJson); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 8abd463..db44d9f 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -39,7 +39,7 @@ import org.apache.samza.config.StorageConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.config.TaskConfigJava; -import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; @@ -73,22 +73,22 @@ public class JobNode { private final String jobName; private final String jobId; private final String id; - private final StreamGraphImpl streamGraph; + private final OperatorSpecGraph specGraph; private final List<StreamEdge> inEdges = new ArrayList<>(); private final List<StreamEdge> outEdges = new ArrayList<>(); private final List<TableSpec> tables = new ArrayList<>(); private final Config config; - JobNode(String jobName, String jobId, StreamGraphImpl streamGraph, Config config) { + JobNode(String jobName, String jobId, OperatorSpecGraph specGraph, Config config) { this.jobName = jobName; this.jobId = jobId; this.id = createId(jobName, jobId); - this.streamGraph = streamGraph; + this.specGraph = specGraph; this.config = config; } - public StreamGraphImpl getStreamGraph() { - return streamGraph; + public OperatorSpecGraph getSpecGraph() { + return this.specGraph; } public String getId() { @@ -154,7 +154,7 @@ public class JobNode { } // set triggering interval if a window or join is defined - if (streamGraph.hasWindowOrJoins()) { + if (specGraph.hasWindowOrJoins()) { if ("-1".equals(config.get(TaskConfig.WINDOW_MS(), "-1"))) { long triggerInterval = computeTriggerInterval(); log.info("Using triggering interval: {} for jobName: {}", triggerInterval, jobName); @@ -163,7 +163,7 @@ public class JobNode { } } - streamGraph.getAllOperatorSpecs().forEach(opSpec -> { + specGraph.getAllOperatorSpecs().forEach(opSpec -> { if (opSpec instanceof StatefulOperatorSpec) { ((StatefulOperatorSpec) opSpec).getStoreDescriptors() .forEach(sd -> configs.putAll(sd.getStorageConfigs())); @@ -228,14 +228,14 @@ public class JobNode { // collect all key and msg serde instances for streams Map<String, Serde> streamKeySerdes = new HashMap<>(); Map<String, Serde> streamMsgSerdes = new HashMap<>(); - Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators(); + Map<StreamSpec, InputOperatorSpec> inputOperators = specGraph.getInputOperators(); inEdges.forEach(edge -> { String streamId = edge.getStreamSpec().getId(); InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec()); streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde()); streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde()); }); - Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams(); + Map<StreamSpec, OutputStreamImpl> outputStreams = specGraph.getOutputStreams(); outEdges.forEach(edge -> { String streamId = edge.getStreamSpec().getId(); OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec()); @@ -246,7 +246,7 @@ public class JobNode { // collect all key and msg serde instances for stores Map<String, Serde> storeKeySerdes = new HashMap<>(); Map<String, Serde> storeMsgSerdes = new HashMap<>(); - streamGraph.getAllOperatorSpecs().forEach(opSpec -> { + specGraph.getAllOperatorSpecs().forEach(opSpec -> { if (opSpec instanceof StatefulOperatorSpec) { ((StatefulOperatorSpec) opSpec).getStoreDescriptors().forEach(storeDescriptor -> { storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde()); @@ -320,8 +320,8 @@ public class JobNode { * Computes the triggering interval to use during the execution of this {@link JobNode} */ private long computeTriggerInterval() { - // Obtain the operator specs from the streamGraph - Collection<OperatorSpec> operatorSpecs = streamGraph.getAllOperatorSpecs(); + // Obtain the operator specs from the specGraph + Collection<OperatorSpec> operatorSpecs = specGraph.getAllOperatorSpecs(); // Filter out window operators, and obtain a list of their triggering interval values List<Long> windowTimerIntervals = operatorSpecs.stream() http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 1681f30..6922c76 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -21,7 +21,6 @@ package org.apache.samza.operators; import java.time.Duration; import java.util.Collection; -import java.util.function.Function; import org.apache.samza.SamzaException; import org.apache.samza.operators.functions.FilterFunction; @@ -64,16 +63,16 @@ import org.apache.samza.table.TableSpec; */ public class MessageStreamImpl<M> implements MessageStream<M> { /** - * The {@link StreamGraphImpl} that contains this {@link MessageStreamImpl} + * The {@link StreamGraphSpec} that contains this {@link MessageStreamImpl} */ - private final StreamGraphImpl graph; + private final StreamGraphSpec graph; /** * The {@link OperatorSpec} associated with this {@link MessageStreamImpl} */ private final OperatorSpec operatorSpec; - public MessageStreamImpl(StreamGraphImpl graph, OperatorSpec<?, M> operatorSpec) { + public MessageStreamImpl(StreamGraphSpec graph, OperatorSpec<?, M> operatorSpec) { this.graph = graph; this.operatorSpec = operatorSpec; } @@ -81,7 +80,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) { String opId = this.graph.getNextOpId(OpCode.MAP); - OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId); + StreamOperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId); this.operatorSpec.registerNextOperatorSpec(op); return new MessageStreamImpl<>(this.graph, op); } @@ -89,7 +88,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public MessageStream<M> filter(FilterFunction<? super M> filterFn) { String opId = this.graph.getNextOpId(OpCode.FILTER); - OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId); + StreamOperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId); this.operatorSpec.registerNextOperatorSpec(op); return new MessageStreamImpl<>(this.graph, op); } @@ -97,7 +96,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) { String opId = this.graph.getNextOpId(OpCode.FLAT_MAP); - OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId); + StreamOperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId); this.operatorSpec.registerNextOperatorSpec(op); return new MessageStreamImpl<>(this.graph, op); } @@ -112,15 +111,15 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public void sendTo(OutputStream<M> outputStream) { String opId = this.graph.getNextOpId(OpCode.SEND_TO); - OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamImpl<M>) outputStream, opId); + OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec( + (OutputStreamImpl<M>) outputStream, opId); this.operatorSpec.registerNextOperatorSpec(op); } @Override public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) { String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId); - OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec( - (WindowInternal<M, K, WV>) window, opId); + OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, opId); this.operatorSpec.registerNextOperatorSpec(op); return new MessageStreamImpl<>(this.graph, op); } @@ -131,24 +130,24 @@ public class MessageStreamImpl<M> implements MessageStream<M> { Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl, String userDefinedId) { if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself."); - OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec(); String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId); - JoinOperatorSpec<K, M, OM, JM> joinOpSpec = - OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, - keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), opId); - - this.operatorSpec.registerNextOperatorSpec(joinOpSpec); - otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec); + OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec(); + JoinOperatorSpec<K, M, OM, JM> op = + OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, keySerde, + messageSerde, otherMessageSerde, ttl.toMillis(), opId); + this.operatorSpec.registerNextOperatorSpec(op); + otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) op); - return new MessageStreamImpl<>(this.graph, joinOpSpec); + return new MessageStreamImpl<>(this.graph, op); } @Override public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table, StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) { + String opId = this.graph.getNextOpId(OpCode.JOIN); TableSpec tableSpec = ((TableImpl) table).getTableSpec(); StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec( - tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, this.graph.getNextOpId(OpCode.JOIN)); + tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId); this.operatorSpec.registerNextOperatorSpec(joinOpSpec); return new MessageStreamImpl<>(this.graph, joinOpSpec); } @@ -157,46 +156,38 @@ public class MessageStreamImpl<M> implements MessageStream<M> { public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) { if (otherStreams.isEmpty()) return this; String opId = this.graph.getNextOpId(OpCode.MERGE); - StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(opId); - this.operatorSpec.registerNextOperatorSpec(opSpec); - otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec)); - return new MessageStreamImpl<>(this.graph, opSpec); + StreamOperatorSpec<M, M> op = OperatorSpecs.createMergeOperatorSpec(opId); + this.operatorSpec.registerNextOperatorSpec(op); + otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(op)); + return new MessageStreamImpl<>(this.graph, op); } @Override - public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, - Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) { + public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor, + MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) { String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId); IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde); if (!intermediateStream.isKeyed()) { // this can only happen when the default serde partitionBy variant is being used throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde."); } - PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec = - OperatorSpecs.createPartitionByOperatorSpec( - intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId); + PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec( + intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId); this.operatorSpec.registerNextOperatorSpec(partitionByOperatorSpec); return intermediateStream; } @Override - public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor, - Function<? super M, ? extends V> valueExtractor, String userDefinedId) { + public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor, + MapFunction<? super M, ? extends V> valueExtractor, String userDefinedId) { return partitionBy(keyExtractor, valueExtractor, null, userDefinedId); } - /** - * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}. - * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}. - */ - protected OperatorSpec<?, M> getOperatorSpec() { - return this.operatorSpec; - } - @Override public <K, V> void sendTo(Table<KV<K, V>> table) { - SendToTableOperatorSpec<K, V> op = OperatorSpecs.createSendToTableOperatorSpec( - this.operatorSpec, ((TableImpl) table).getTableSpec(), this.graph.getNextOpId(OpCode.SEND_TO)); + String opId = this.graph.getNextOpId(OpCode.SEND_TO); + SendToTableOperatorSpec<K, V> op = + OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableSpec(), opId); this.operatorSpec.registerNextOperatorSpec(op); } @@ -215,4 +206,12 @@ public class MessageStreamImpl<M> implements MessageStream<M> { return broadcast(null, userDefinedId); } + /** + * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}. + * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}. + */ + protected OperatorSpec<?, M> getOperatorSpec() { + return this.operatorSpec; + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java new file mode 100644 index 0000000..ba51c7c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.serializers.SerializableSerde; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.table.TableSpec; + + +/** + * Defines the serialized format of {@link StreamGraphSpec}. This class encapsulates all getter methods to get the {@link OperatorSpec} + * initialized in the {@link StreamGraphSpec} and constructsthe corresponding serialized instances of {@link OperatorSpec}. + * The {@link StreamGraphSpec} and {@link OperatorSpec} instances included in this class are considered as immutable and read-only. + * The instance of {@link OperatorSpecGraph} should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}. + */ +public class OperatorSpecGraph implements Serializable { + // We use a LHM for deterministic order in initializing and closing operators. + private final Map<StreamSpec, InputOperatorSpec> inputOperators; + private final Map<StreamSpec, OutputStreamImpl> outputStreams; + private final Map<TableSpec, TableImpl> tables; + private final Set<OperatorSpec> allOpSpecs; + private final boolean hasWindowOrJoins; + + // The following objects are transient since they are recreateable. + private transient final SerializableSerde<OperatorSpecGraph> opSpecGraphSerde = new SerializableSerde<>(); + private transient final byte[] serializedOpSpecGraph; + + OperatorSpecGraph(StreamGraphSpec graphSpec) { + this.inputOperators = graphSpec.getInputOperators(); + this.outputStreams = graphSpec.getOutputStreams(); + this.tables = graphSpec.getTables(); + this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs()); + this.hasWindowOrJoins = checkWindowOrJoins(); + this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this); + } + + public Map<StreamSpec, InputOperatorSpec> getInputOperators() { + return inputOperators; + } + + public Map<StreamSpec, OutputStreamImpl> getOutputStreams() { + return outputStreams; + } + + public Map<TableSpec, TableImpl> getTables() { + return tables; + } + + /** + * Get all {@link OperatorSpec}s available in this {@link StreamGraphSpec} + * + * @return all available {@link OperatorSpec}s + */ + public Collection<OperatorSpec> getAllOperatorSpecs() { + return allOpSpecs; + } + + /** + * Returns <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator + * + * @return <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator + */ + public boolean hasWindowOrJoins() { + return hasWindowOrJoins; + } + + /** + * Returns a deserialized {@link OperatorSpecGraph} as a copy from this instance of {@link OperatorSpecGraph} + * This is used to create per-task instance of {@link OperatorSpecGraph} when instantiating task instances. + * + * @return a copy of this {@link OperatorSpecGraph} object via deserialization + */ + public OperatorSpecGraph clone() { + if (opSpecGraphSerde == null) { + throw new IllegalStateException("Cannot clone from an already deserialized OperatorSpecGraph."); + } + return opSpecGraphSerde.fromBytes(serializedOpSpecGraph); + } + + private HashSet<OperatorSpec> findAllOperatorSpecs() { + Collection<InputOperatorSpec> inputOperatorSpecs = this.inputOperators.values(); + HashSet<OperatorSpec> operatorSpecs = new HashSet<>(); + for (InputOperatorSpec inputOperatorSpec : inputOperatorSpecs) { + operatorSpecs.add(inputOperatorSpec); + doGetOperatorSpecs(inputOperatorSpec, operatorSpecs); + } + return operatorSpecs; + } + + private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> specs) { + Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs(); + for (OperatorSpec registeredOperatorSpec : registeredOperatorSpecs) { + specs.add(registeredOperatorSpec); + doGetOperatorSpecs(registeredOperatorSpec, specs); + } + } + + private boolean checkWindowOrJoins() { + Set<OperatorSpec> windowOrJoinSpecs = allOpSpecs.stream() + .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || spec.getOpCode() == OperatorSpec.OpCode.JOIN) + .collect(Collectors.toSet()); + + return windowOrJoinSpecs.size() != 0; + } + +}