SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment
This is the initial PR for SEP-13. High-lighted changes: - Define StreamApplication and TaskApplication with describe(ApplicationDescriptor) API to define processing logic of a Stream application - the objects instantiated and registered to ApplicationDescriptor in describe() method should be serializable - Define ApplicationRunner to have mandatory constructor parameter of ApplicationDescriptor - Define ProcessorLifecycleListenerFactory to allow user inject local logic and instantiate local objects in the processors in an application Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz> Author: Yi Pan (Data Infrastructure) <yi...@yipan-ld2.linkedin.biz> Author: Prateek Maheshwari <pmaheshw...@linkedin.com> Author: Prateek Maheshwari <prate...@utexas.edu> Author: prateekm <prate...@utexas.edu> Reviewers: Prateek Maheshwari <pmaheshw...@apache.org>, Cameron Lee <ca...@linkedin.com> Closes #606 from nickpan47/app-runtime-with-processor-callbacks and squashes the following commits: 3e60d44a [Yi Pan (Data Infrastructure)] SAMZA-1789: final revision on ApplicationDescriptor and ApplicationRunner APIs bdb5b0fc [Yi Pan (Data Infrastructure)] SAMZA-1789: ApplicationRunner and ApplicationDescriptor final revision 66af5b70 [Yi Pan (Data Infrastructure)] SAMZA-1789: addressing Cameron's review comments. ec4bb1dc [Yi Pan (Data Infrastructure)] SAMZA-1789: merge with fix for SAMZA-1836 9c89c63d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 91fcd73a [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 34ffda8a [Yi Pan (Data Infrastructure)] SAMZA-1789: disabling tests due to SAMZA-1836 02076c85 [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed the modifier for the mandatory constructor of ApplicationRunner; Disabled three tests due to wrong configure for test systems 222abf21 [Yi Pan (Data Infrastructure)] SAMZA-1789: added a constructor to StreamProcessor to take a StreamProcessorListenerFactory 7a73992a [Yi Pan (Data Infrastructure)] SAMZA-1789: fixing checkstyle and javadoc errors 9997b98b [Yi Pan (Data Infrastructure)] SAMZA-1789: renamed all ApplicationDescriptor classes with full-spelling of Application f4b3d43a [Yi Pan (Data Infrastructure)] SAMZA-1789: Fxing TaskApplication examples and some checkstyle errors f2969f8d [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed ApplicationDescriptor to use InputDescriptor and OutputDescriptor; addressed Prateek's comments. f04404cc [Yi Pan (Data Infrastructure)] SAMZA-1789: move createStreams out of the loop in prepareJobs 33753f72 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 12c09af0 [Yi Pan (Data Infrastructure)] SAMZA-1789: Fix a merging error (with SAMZA-1813) a072118d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks e7af6932 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 8d4d3ffd [Yi Pan (Data Infrastructure)] Merge with master 055bd91e [Yi Pan (Data Infrastructure)] SAMZA-1789: fix unit test with ThreadJobFactory 247dcff4 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 1621c4d0 [Yi Pan (Data Infrastructure)] SAMZA-1789: a few more fixes to address Cameron's reviews 6e446fe6 [Yi Pan (Data Infrastructure)] SAMZA-1789: address Cameron's review comments. 4382d45d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 3b2f04d5 [Yi Pan (Data Infrastructure)] SAMZA-1789: moved all impl classes from samza-api to samza-core. db96da83 [Yi Pan (Data Infrastructure)] SAMZA-1789: WIP - revision to address review feedbacks. 01433717 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks a82708bb [Yi Pan (Data Infrastructure)] SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment c4bb0dce [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks f20cdcda [Yi Pan (Data Infrastructure)] WIP: adding unit tests. Pending update on StreamProcessorLifecycleListener, LocalContainerRunner, and SamzaContainerListener 973eb526 [Yi Pan (Data Infrastructure)] WIP: compiles, still working on LocalContainerRunner refactor fb1bc49e [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-spec-with-app-runtime-Jul-16-18 30a4e5f0 [Yi Pan (Data Infrastructure)] WIP: application runner refactor - proto-type for SEP-13 95577b74 [Yi Pan (Data Infrastructure)] WIP: trying to figure out the two interface classes for spec: a) spec builder in init(); b) spec reader in all other lifecycle methods 42782d81 [Yi Pan (Data Infrastructure)] Merge branch 'prateek-remove-app-runner-stream-spec' into app-spec-with-app-runtime-Jul-16-18 d43e9231 [Yi Pan (Data Infrastructure)] WIP: proto-type with ApplicationRunnable and no ApplicationRunner exposed to user f1cb8f0e [Yi Pan (Data Infrastructure)] Merge branch 'master' into single-app-api-May-21-18 7e71dc7e [Yi Pan (Data Infrastructure)] Merge with master 85619301 [Prateek Maheshwari] Merge branch 'master' into stream-spec-cleanup 7d7aa508 [Prateek Maheshwari] Updated with Cameron and Daniel's feedback. 8e6fc2da [prateekm] Remove all usages of StreamSpec and ApplicationRunner from the operator spec and impl layers. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/abf49eaa Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/abf49eaa Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/abf49eaa Branch: refs/heads/master Commit: abf49eaaa59c258255e7436ae323d999e6cba51b Parents: b0b2922 Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Authored: Thu Sep 6 23:35:59 2018 -0700 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Thu Sep 6 23:35:59 2018 -0700 ---------------------------------------------------------------------- .../application/ApplicationDescriptor.java | 80 +++ .../samza/application/SamzaApplication.java | 40 ++ .../samza/application/StreamApplication.java | 75 +-- .../StreamApplicationDescriptor.java | 113 ++++ .../samza/application/TaskApplication.java | 86 +++ .../application/TaskApplicationDescriptor.java | 64 ++ .../java/org/apache/samza/config/Config.java | 3 +- .../samza/metrics/MetricsReporterFactory.java | 5 +- .../apache/samza/operators/MessageStream.java | 9 +- .../org/apache/samza/operators/StreamGraph.java | 120 ---- .../operators/functions/ClosableFunction.java | 7 +- .../operators/functions/InitableFunction.java | 6 +- .../operators/functions/StreamExpander.java | 16 +- .../apache/samza/runtime/ApplicationRunner.java | 92 +-- .../samza/runtime/ApplicationRunners.java | 82 +++ .../apache/samza/runtime/ProcessorContext.java | 31 + .../runtime/ProcessorLifecycleListener.java | 55 ++ .../ProcessorLifecycleListenerFactory.java | 40 ++ .../samza/task/AsyncStreamTaskFactory.java | 10 +- .../apache/samza/task/StreamTaskFactory.java | 6 +- .../java/org/apache/samza/task/TaskFactory.java | 38 ++ .../samza/runtime/TestApplicationRunners.java | 88 +++ .../application/ApplicationDescriptorImpl.java | 179 ++++++ .../application/ApplicationDescriptorUtil.java | 51 ++ .../samza/application/ApplicationUtil.java | 63 ++ .../application/LegacyTaskApplication.java | 37 ++ .../StreamApplicationDescriptorImpl.java | 381 ++++++++++++ .../TaskApplicationDescriptorImpl.java | 129 ++++ .../samza/container/SamzaContainerListener.java | 22 +- .../samza/execution/ExecutionPlanner.java | 7 +- .../org/apache/samza/execution/JobGraph.java | 6 - .../org/apache/samza/execution/JobPlanner.java | 188 ++++++ .../apache/samza/execution/LocalJobPlanner.java | 134 +++++ .../samza/execution/RemoteJobPlanner.java | 96 +++ .../samza/operators/MessageStreamImpl.java | 57 +- .../samza/operators/OperatorSpecGraph.java | 26 +- .../apache/samza/operators/StreamGraphSpec.java | 336 ----------- .../samza/operators/spec/OperatorSpec.java | 2 +- .../stream/IntermediateMessageStreamImpl.java | 6 +- .../apache/samza/processor/StreamProcessor.java | 122 ++-- .../StreamProcessorLifecycleListener.java | 49 -- .../runtime/AbstractApplicationRunner.java | 135 ----- .../samza/runtime/ApplicationRunnerMain.java | 42 +- .../samza/runtime/LocalApplicationRunner.java | 355 ++++------- .../samza/runtime/LocalContainerRunner.java | 56 +- .../samza/runtime/RemoteApplicationRunner.java | 123 ++-- .../apache/samza/task/StreamOperatorTask.java | 5 +- .../org/apache/samza/task/TaskFactoryUtil.java | 137 ++--- .../apache/samza/container/SamzaContainer.scala | 16 +- .../scala/org/apache/samza/job/JobRunner.scala | 2 - .../samza/job/local/ThreadJobFactory.scala | 48 +- .../application/MockStreamApplication.java | 29 + .../samza/application/TestApplicationUtil.java | 96 +++ .../TestStreamApplicationDescriptorImpl.java | 584 +++++++++++++++++++ .../TestTaskApplicationDescriptorImpl.java | 144 +++++ .../samza/execution/TestExecutionPlanner.java | 192 +++--- .../execution/TestJobGraphJsonGenerator.java | 120 ++-- .../org/apache/samza/execution/TestJobNode.java | 53 +- .../samza/execution/TestLocalJobPlanner.java | 211 +++++++ .../samza/execution/TestRemoteJobPlanner.java | 88 +++ .../samza/operators/TestJoinOperator.java | 103 ++-- .../samza/operators/TestMessageStreamImpl.java | 29 +- .../samza/operators/TestOperatorSpecGraph.java | 19 +- .../samza/operators/TestStreamGraphSpec.java | 506 ---------------- .../operators/impl/TestOperatorImplGraph.java | 190 +++--- .../operators/impl/TestWindowOperator.java | 147 ++--- .../spec/TestPartitionByOperatorSpec.java | 70 ++- .../samza/processor/TestStreamProcessor.java | 139 +++-- .../runtime/TestApplicationRunnerMain.java | 47 +- .../runtime/TestLocalApplicationRunner.java | 311 +++------- .../runtime/TestRemoteApplicationRunner.java | 35 +- .../apache/samza/task/MockAsyncStreamTask.java | 31 + .../org/apache/samza/task/MockStreamTask.java | 31 + .../apache/samza/task/TestTaskFactoryUtil.java | 215 ++----- .../samza/testUtils/TestAsyncStreamTask.java | 35 -- .../samza/testUtils/TestStreamApplication.java | 33 -- .../apache/samza/testUtils/TestStreamTask.java | 34 -- .../samza/container/TestSamzaContainer.scala | 76 ++- .../samza/sql/runner/SamzaSqlApplication.java | 13 +- .../sql/runner/SamzaSqlApplicationRunner.java | 53 +- .../samza/sql/translator/JoinTranslator.java | 2 +- .../samza/sql/translator/QueryTranslator.java | 27 +- .../samza/sql/translator/ScanTranslator.java | 8 +- .../samza/sql/translator/TranslatorContext.java | 19 +- .../apache/samza/sql/e2e/TestSamzaSqlTable.java | 8 +- .../runner/TestSamzaSqlApplicationRunner.java | 2 - .../sql/translator/TestFilterTranslator.java | 6 +- .../sql/translator/TestJoinTranslator.java | 16 +- .../sql/translator/TestProjectTranslator.java | 14 +- .../sql/translator/TestQueryTranslator.java | 162 +++-- .../example/AppWithGlobalConfigExample.java | 25 +- .../apache/samza/example/BroadcastExample.java | 22 +- .../samza/example/KeyValueStoreExample.java | 19 +- .../org/apache/samza/example/MergeExample.java | 18 +- .../samza/example/OrderShipmentJoinExample.java | 19 +- .../samza/example/PageViewCounterExample.java | 15 +- .../samza/example/RepartitionExample.java | 19 +- .../samza/example/TaskApplicationExample.java | 77 +++ .../org/apache/samza/example/WindowExample.java | 18 +- .../samza/system/mock/MockSystemConsumer.java | 4 +- .../apache/samza/test/framework/TestRunner.java | 41 +- .../integration/LocalApplicationRunnerMain.java | 21 +- .../TestStandaloneIntegrationApplication.java | 9 +- .../processor/TestZkStreamProcessorBase.java | 20 +- .../EndOfStreamIntegrationTest.java | 37 +- .../WatermarkIntegrationTest.java | 62 +- .../test/framework/BroadcastAssertApp.java | 7 +- .../StreamApplicationIntegrationTest.java | 9 +- ...StreamApplicationIntegrationTestHarness.java | 42 +- .../samza/test/framework/TestTimerApp.java | 7 +- .../apache/samza/test/framework/TimerTest.java | 18 +- .../test/operator/RepartitionJoinWindowApp.java | 25 +- .../test/operator/RepartitionWindowApp.java | 20 +- .../samza/test/operator/SessionWindowApp.java | 17 +- .../operator/TestRepartitionJoinWindowApp.java | 30 +- .../test/operator/TestRepartitionWindowApp.java | 10 +- .../samza/test/operator/TumblingWindowApp.java | 16 +- .../test/processor/TestStreamApplication.java | 82 +-- .../test/processor/TestStreamProcessor.java | 18 +- .../processor/TestZkLocalApplicationRunner.java | 317 +++++----- .../apache/samza/test/table/TestLocalTable.java | 39 +- .../table/TestLocalTableWithSideInputs.java | 13 +- .../samza/test/table/TestRemoteTable.java | 27 +- .../benchmark/SystemConsumerWithSamzaBench.java | 14 +- 124 files changed, 5280 insertions(+), 3631 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java new file mode 100644 index 0000000..178fdee --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/application/ApplicationDescriptor.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import java.util.Map; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsReporterFactory; +import org.apache.samza.operators.ContextManager; +import org.apache.samza.runtime.ProcessorLifecycleListenerFactory; + + +/** + * The interface class to describe the configuration, input and output streams, and processing logic in a {@link SamzaApplication}. + * <p> + * Sub-classes {@link StreamApplicationDescriptor} and {@link TaskApplicationDescriptor} are specific interfaces for applications + * written in high-level {@link StreamApplication} and low-level {@link TaskApplication} APIs, respectively. + * + * @param <S> sub-class of user application descriptor. + */ +@InterfaceStability.Evolving +public interface ApplicationDescriptor<S extends ApplicationDescriptor> { + + /** + * Get the {@link Config} of the application + * @return config of the application + */ + Config getConfig(); + + /** + * Sets the {@link ContextManager} for this application. + * <p> + * Setting the {@link ContextManager} is optional. The provided {@link ContextManager} can be used to build the shared + * context between the operator functions within a task instance + * + * TODO: this should be replaced by the shared context factory when SAMZA-1714 is fixed. + + * @param contextManager the {@link ContextManager} to use for the application + * @return type {@code S} of {@link ApplicationDescriptor} with {@code contextManager} set as its {@link ContextManager} + */ + S withContextManager(ContextManager contextManager); + + /** + * Sets the {@link ProcessorLifecycleListenerFactory} for this application. + * + * <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to + * plug in optional code to be invoked in different stages before/after the main processing logic is started/stopped in + * the application. + * + * @param listenerFactory the user implemented {@link ProcessorLifecycleListenerFactory} that creates lifecycle listener + * with callback methods before and after the start/stop of each StreamProcessor in the application + * @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory} + */ + S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory); + + /** + * Sets a set of customized {@link MetricsReporterFactory}s in the application + * + * @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used + * @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories} + */ + S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java new file mode 100644 index 0000000..7606be8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * The base interface for all user-implemented applications in Samza. + * <p> + * The main processing logic of the user application should be implemented in {@link SamzaApplication#describe(ApplicationDescriptor)} + * method. Sub-classes {@link StreamApplication} and {@link TaskApplication} are specific interfaces for applications + * written in high-level DAG and low-level task APIs, respectively. + */ +@InterfaceStability.Evolving +public interface SamzaApplication<S extends ApplicationDescriptor> { + + /** + * Describes the user processing logic via {@link ApplicationDescriptor} + * + * @param appDesc the {@link ApplicationDescriptor} object to describe user application logic + */ + void describe(S appDesc); +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java index 0b2142b..a83cb37 100644 --- a/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java +++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplication.java @@ -19,30 +19,27 @@ package org.apache.samza.application; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.operators.ContextManager; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.functions.InitableFunction; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; /** - * Describes and initializes the transforms for processing message streams and generating results. + * Describes and initializes the transforms for processing message streams and generating results in high-level API. * <p> * The following example removes page views older than 1 hour from the input stream: * <pre>{@code - * public class PageViewCounter implements StreamApplication { - * public void init(StreamGraph graph, Config config) { - * MessageStream<PageViewEvent> pageViewEvents = - * graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m); - * OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents = - * graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> m); + * public class PageViewFilter implements StreamApplication { + * public void describe(StreamAppDescriptor appDesc) { + * KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); + * KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = + * trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); + * + * KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor = + * trackingSystem.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class))); + * + * MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor); + * OutputStream<PageViewEvent> recentPageViewEvents = appDesc.getOutputStream(outputStreamDescriptor); * * pageViewEvents * .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) - * .sendTo(filteredPageViewEvents); + * .sendTo(recentPageViewEvents); * } * } * }</pre> @@ -52,46 +49,28 @@ import org.apache.samza.task.TaskContext; * public static void main(String[] args) { * CommandLine cmdLine = new CommandLine(); * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - * PageViewCounter app = new PageViewCounter(); - * LocalApplicationRunner runner = new LocalApplicationRunner(config); - * runner.run(app); + * PageViewFilter app = new PageViewFilter(); + * ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config); + * runner.run(); * runner.waitForFinish(); * } * }</pre> * * <p> - * Implementation Notes: Currently StreamApplications are wrapped in a {@link StreamTask} during execution. - * A new StreamApplication instance will be created and initialized with a user-defined {@link StreamGraph} - * when planning the execution. The {@link StreamGraph} and the functions implemented for transforms are required to - * be serializable. The execution planner will generate a serialized DAG which will be deserialized in each {@link StreamTask} - * instance used for processing incoming messages. Execution is synchronous and thread-safe within each {@link StreamTask}. + * Implementation Notes: Currently {@link StreamApplication}s are wrapped in a {@link org.apache.samza.task.StreamTask} + * during execution. The execution planner will generate a serialized DAG which will be deserialized in each + * {@link org.apache.samza.task.StreamTask} instance used for processing incoming messages. Execution is synchronous + * and thread-safe within each {@link org.apache.samza.task.StreamTask}. * * <p> + * A {@link StreamApplication} implementation must have a proper fully-qualified class name and a default constructor + * with no parameters to ensure successful instantiation in both local and remote environments. * Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction}, * {@link org.apache.samza.operators.functions.FilterFunction} for e.g.) are initable and closable. They are initialized - * before messages are delivered to them and closed after their execution when the {@link StreamTask} instance is closed. - * See {@link InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}. + * before messages are delivered to them and closed after their execution when the {@link org.apache.samza.task.StreamTask} + * instance is closed. See {@link org.apache.samza.operators.functions.InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}. + * Function implementations are required to be {@link java.io.Serializable}. */ -@InterfaceStability.Unstable -public interface StreamApplication { - - /** - * Describes and initializes the transforms for processing message streams and generating results. - * <p> - * The {@link StreamGraph} provides access to input and output streams. Input {@link MessageStream}s can be - * transformed into other {@link MessageStream}s or sent to an {@link OutputStream} using the {@link MessageStream} - * operators. - * <p> - * Most operators accept custom functions for doing the transformations. These functions are {@link InitableFunction}s - * and are provided the {@link Config} and {@link TaskContext} during their own initialization. The config and the - * context can be used, for example, to create custom metrics or access durable state stores. - * <p> - * A shared context between {@link InitableFunction}s for different operators within a task instance can be set - * up by providing a {@link ContextManager} using {@link StreamGraph#withContextManager}. - * - * @param graph the {@link StreamGraph} to get input/output streams from - * @param config the configuration for the application - */ - void init(StreamGraph graph, Config config); - +@InterfaceStability.Evolving +public interface StreamApplication extends SamzaApplication<StreamApplicationDescriptor> { } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java new file mode 100644 index 0000000..5a3de49 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/application/StreamApplicationDescriptor.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.table.Table; + + +/** + * The interface class to describe a {@link SamzaApplication} in high-level API in Samza. + */ +@InterfaceStability.Evolving +public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> { + + /** + * Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting + * {@code job.default.system} and its properties in configuration. + * <p> + * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams. + * <p> + * If an input/output stream is created with a stream-level Serde, they will be used, else the serde specified + * for the {@code job.default.system} in configuration will be used. + * <p> + * Providing an incompatible message type for the intermediate streams that use the default serde will result in + * {@link ClassCastException}s at runtime. + * + * @param defaultSystemDescriptor the default system descriptor to use + * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system + */ + StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor); + + /** + * Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}. + * <p> + * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>}, + * can receive messages of type {@code KV<K, V>}. An input {@code MessageStream<M>}, obtained using a descriptor with + * any other {@code Serde<M>}, can receive messages of type M - the key in the incoming message is ignored. + * <p> + * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the + * {@code SystemConsumer} deserializes the incoming messages itself, and no further deserialization is required from + * the framework. + * <p> + * Multiple invocations of this method with the same {@code inputDescriptor} will throw an + * {@link IllegalStateException}. + * + * @param inputDescriptor the descriptor for the stream + * @param <M> the type of messages in the input {@link MessageStream} + * @return the input {@link MessageStream} + * @throws IllegalStateException when invoked multiple times with the same {@code inputDescriptor} + */ + <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor); + + /** + * Gets the {@link OutputStream} corresponding to the {@code outputDescriptor}. + * <p> + * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>}, + * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, obtained using a descriptor with any + * other {@code Serde<M>}, can send messages of type M without a key. + * <p> + * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the + * {@code SystemProducer} serializes the outgoing messages itself, and no prior serialization is required from + * the framework. + * <p> + * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are partitioned using their serialized key. + * When sending messages to any other {@code OutputStream<M>}, messages are partitioned using a null partition key. + * <p> + * Multiple invocations of this method with the same {@code outputDescriptor} will throw an + * {@link IllegalStateException}. + * + * @param outputDescriptor the descriptor for the stream + * @param <M> the type of messages in the {@link OutputStream} + * @return the {@link OutputStream} + * @throws IllegalStateException when invoked multiple times with the same {@code outputDescriptor} + */ + <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor); + + /** + * Gets the {@link Table} corresponding to the {@link TableDescriptor}. + * <p> + * Multiple invocations of this method with the same {@link TableDescriptor} will throw an + * {@link IllegalStateException}. + * + * @param tableDescriptor the {@link TableDescriptor} + * @param <K> the type of the key + * @param <V> the type of the value + * @return the {@link Table} corresponding to the {@code tableDescriptor} + * @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor} + */ + <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java new file mode 100644 index 0000000..424634d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/application/TaskApplication.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * Describes and initializes the transforms for processing message streams and generating results in low-level API. Your + * application is expected to implement this interface. + * <p> + * The following example removes page views older than 1 hour from the input stream: + * <pre>{@code + * public class PageViewFilter implements TaskApplication { + * public void describe(TaskAppDescriptor appDesc) { + * KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor(PageViewTask.SYSTEM); + * KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = + * trackingSystem.getInputDescriptor(PageViewTask.TASK_INPUT, new JsonSerdeV2<>(PageViewEvent.class)); + * + * KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor = + * trackingSystem.getOutputDescriptor(PageViewTask.TASK_OUTPUT, new JsonSerdeV2<>(PageViewEvent.class))); + * + * appDesc.addInputStream(inputStreamDescriptor); + * appDesc.addOutputStream(outputStreamDescriptor); + * appDesc.setTaskFactory((StreamTaskFactory) () -> new PageViewTask()); + * } + * } + * + * public class PageViewTask implements StreamTask { + * final static String TASK_INPUT = "pageViewEvents"; + * final static String TASK_OUTPUT = "recentPageViewEvents"; + * final static String SYSTEM = "kafka"; + * + * public void process(IncomingMessageEnvelope message, MessageCollector collector, + * TaskCoordinator coordinator) { + * PageViewEvent m = (PageViewEvent) message.getValue(); + * if (m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) { + * collector.send(new OutgoingMessageEnvelope(new SystemStream(SYSTEM, TASK_OUTPUT), + * message.getKey(), message.getKey(), m)); + * } + * } + * } + * }</pre> + * + *<p> + * The example above can be run using an ApplicationRunner: + * <pre>{@code + * public static void main(String[] args) { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * PageViewFilter app = new PageViewFilter(); + * ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config); + * runner.run(); + * runner.waitForFinish(); + * } + * }</pre> + * + * <p> + * Implementation Notes: {@link TaskApplication} allow users to instantiate {@link org.apache.samza.task.StreamTask} or + * {@link org.apache.samza.task.AsyncStreamTask} when describing the processing logic. A new {@link TaskApplicationDescriptor } + * instance will be created and described by the user-defined {@link TaskApplication} when planning the execution. + * {@link org.apache.samza.task.TaskFactory} is required to be serializable. + * + * <p> + * The user-implemented {@link TaskApplication} class must be a class with proper fully-qualified class name and + * a default constructor with no parameters to ensure successful instantiation in both local and remote environments. + */ +@InterfaceStability.Evolving +public interface TaskApplication extends SamzaApplication<TaskApplicationDescriptor> { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java new file mode 100644 index 0000000..0226bb5 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/application/TaskApplicationDescriptor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.task.TaskFactory; + + +/** + * The interface to describe a {@link SamzaApplication} that uses low-level API task for processing. + */ +@InterfaceStability.Evolving +public interface TaskApplicationDescriptor extends ApplicationDescriptor<TaskApplicationDescriptor> { + + /** + * Sets the {@link TaskFactory} for the user application. The {@link TaskFactory#createInstance()} creates task instance + * that implements the main processing logic of the user application. + * + * @param factory the {@link TaskFactory} including the low-level task processing logic. The only allowed task factory + * classes are {@link org.apache.samza.task.StreamTaskFactory} and {@link org.apache.samza.task.AsyncStreamTaskFactory}. + */ + void setTaskFactory(TaskFactory factory); + + /** + * Adds the input stream to the application. + * + * @param isd the {@link InputDescriptor} + */ + void addInputStream(InputDescriptor isd); + + /** + * Adds the output stream to the application. + * + * @param osd the {@link OutputDescriptor} of the output stream + */ + void addOutputStream(OutputDescriptor osd); + + /** + * Adds the {@link TableDescriptor} used in the application + * + * @param table {@link TableDescriptor} + */ + void addTable(TableDescriptor table); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/config/Config.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java b/samza-api/src/main/java/org/apache/samza/config/Config.java index d4164c6..68f085c 100644 --- a/samza-api/src/main/java/org/apache/samza/config/Config.java +++ b/samza-api/src/main/java/org/apache/samza/config/Config.java @@ -19,6 +19,7 @@ package org.apache.samza.config; +import java.io.Serializable; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; @@ -32,7 +33,7 @@ import java.util.regex.Pattern; /** * Store and retrieve named, typed values as configuration for classes implementing this interface. */ -public abstract class Config implements Map<String, String> { +public abstract class Config implements Map<String, String>, Serializable { public static final String SENSITIVE_PREFIX = "sensitive."; public static final String SENSITIVE_MASK = "********"; http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java index 7807222..b9934e5 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsReporterFactory.java @@ -19,11 +19,12 @@ package org.apache.samza.metrics; +import java.io.Serializable; import org.apache.samza.config.Config; /** * Build a {@link org.apache.samza.metrics.MetricsReporter} */ -public interface MetricsReporterFactory { - MetricsReporter getMetricsReporter(String name, String containerName, Config config); +public interface MetricsReporterFactory extends Serializable { + MetricsReporter getMetricsReporter(String name, String processorId, Config config); } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index e3a61c4..a7935d3 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -39,11 +39,12 @@ import org.apache.samza.table.Table; /** * A stream of messages that can be transformed into another {@link MessageStream}. * <p> - * A {@link MessageStream} corresponding to an input stream can be obtained using {@link StreamGraph#getInputStream}. + * A {@link MessageStream} corresponding to an input stream can be obtained using + * {@link org.apache.samza.application.StreamApplicationDescriptor#getInputStream}. * * @param <M> the type of messages in this stream */ -@InterfaceStability.Unstable +@InterfaceStability.Evolving public interface MessageStream<M> { /** @@ -213,8 +214,8 @@ public interface MessageStream<M> { /** * Re-partitions this {@link MessageStream} using keys from the {@code keyExtractor} by creating a new - * intermediate stream on the default system provided via {@link StreamGraph#setDefaultSystem}. This intermediate - * stream is both an output and input to the job. + * intermediate stream on the default system provided via {@link org.apache.samza.application.StreamApplicationDescriptor#withDefaultSystem}. + * This intermediate stream is both an output and input to the job. * <p> * Uses the provided {@link KVSerde} for serialization of keys and values. If the provided {@code serde} is null, * uses the key and message serde configured for the job's default system. http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java deleted file mode 100644 index ec6e4b7..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; -import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; -import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; -import org.apache.samza.table.Table; - - -/** - * Provides access to {@link MessageStream}s and {@link OutputStream}s used to describe application logic. - */ -@InterfaceStability.Unstable -public interface StreamGraph { - - /** - * Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting - * {@code job.default.system} and its properties in configuration. - * <p> - * If the default system descriptor is set, it must be set <b>before</b> creating any intermediate streams. - * <p> - * If the intermediate stream is created with a stream-level Serde, they will be used, else the serde specified - * for the {@code job.default.system} in configuration will be used. - * <p> - * Providing an incompatible message type for the intermediate streams that use the default serde will result in - * {@link ClassCastException}s at runtime. - * - * @param defaultSystemDescriptor the default system descriptor to use - */ - void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor); - - /** - * Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}. - * <p> - * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>}, - * can receive messages of type {@code KV<K, V>}. An input {@code MessageStream<M>}, obtained using a descriptor with - * any other {@code Serde<M>}, can receive messages of type M - the key in the incoming message is ignored. - * <p> - * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the - * {@code SystemConsumer} deserializes the incoming messages itself, and no further deserialization is required from - * the framework. - * <p> - * Multiple invocations of this method with the same {@code inputDescriptor} will throw an - * {@link IllegalStateException}. - * - * @param inputDescriptor the descriptor for the stream - * @param <M> the type of messages in the input {@link MessageStream} - * @return the input {@link MessageStream} - * @throws IllegalStateException when invoked multiple times with the same {@code inputDescriptor} - */ - <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor); - - /** - * Gets the {@link OutputStream} corresponding to the {@code outputDescriptor}. - * <p> - * An {@code OutputStream<KV<K, V>>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>}, - * can send messages of type {@code KV<K, V>}. An {@code OutputStream<M>}, obtained using a descriptor with any - * other {@code Serde<M>}, can send messages of type M without a key. - * <p> - * A {@code KVSerde<NoOpSerde, NoOpSerde>} or {@code NoOpSerde} may be used for the descriptor if the - * {@code SystemProducer} serializes the outgoing messages itself, and no prior serialization is required from - * the framework. - * <p> - * When sending messages to an {@code OutputStream<KV<K, V>>}, messages are partitioned using their serialized key. - * When sending messages to any other {@code OutputStream<M>}, messages are partitioned using a null partition key. - * <p> - * Multiple invocations of this method with the same {@code outputDescriptor} will throw an - * {@link IllegalStateException}. - * - * @param outputDescriptor the descriptor for the stream - * @param <M> the type of messages in the {@link OutputStream} - * @return the {@link OutputStream} - * @throws IllegalStateException when invoked multiple times with the same {@code outputDescriptor} - */ - <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor); - - /** - * Gets the {@link Table} corresponding to the {@link TableDescriptor}. - * <p> - * Multiple invocations of this method with the same {@link TableDescriptor} will throw an - * {@link IllegalStateException}. - * - * @param tableDescriptor the {@link TableDescriptor} - * @param <K> the type of the key - * @param <V> the type of the value - * @return the {@link Table} corresponding to the {@code tableDescriptor} - * @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor} - */ - <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor); - - /** - * Sets the {@link ContextManager} for this {@link StreamGraph}. - * <p> - * The provided {@link ContextManager} can be used to setup shared context between the operator functions - * within a task instance - * - * @param contextManager the {@link ContextManager} to use for the {@link StreamGraph} - * @return the {@link StreamGraph} with {@code contextManager} set as its {@link ContextManager} - */ - StreamGraph withContextManager(ContextManager contextManager); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java index faf9fc5..12823cc 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java @@ -21,17 +21,18 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; + /** * A function that can be closed after its execution. * * <p> Implement {@link #close()} to free resources used during the execution of the function, clean up state etc. * * <p> Order of finalization: {@link ClosableFunction}s are closed in the reverse topological order of operators in the - * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results - * from operator A, then operator B is guaranteed to be closed before operator A. + * {@link org.apache.samza.application.StreamApplicationDescriptor}. For any two operators A and B in the graph, if operator B + * consumes results from operator A, then operator B is guaranteed to be closed before operator A. * */ -@InterfaceStability.Unstable +@InterfaceStability.Evolving public interface ClosableFunction { /** * Frees any resource acquired by the operators in {@link InitableFunction} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java index 6651819..8a5d83b 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java @@ -27,11 +27,11 @@ import org.apache.samza.task.TaskContext; * A function that can be initialized before execution. * * <p> Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the - * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results - * from operator A, then operator A is guaranteed to be initialized before operator B. + * {@link org.apache.samza.application.StreamApplicationDescriptor}. For any two operators A and B in the graph, if operator B + * consumes results from operator A, then operator A is guaranteed to be initialized before operator B. * */ -@InterfaceStability.Unstable +@InterfaceStability.Evolving public interface InitableFunction { /** http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java index 085a98d..7bbf601 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamExpander.java @@ -19,13 +19,13 @@ package org.apache.samza.operators.functions; import java.io.Serializable; +import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; /** - * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraph}, - * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamGraph#getInputStream} + * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor}, + * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamApplicationDescriptor#getInputStream} * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an expanding system descriptor. * <p> * This is provided by default by expanding system descriptor implementations and can not be overridden @@ -36,23 +36,23 @@ import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; public interface StreamExpander<OM> extends Serializable { /** - * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamGraph}, + * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor}, * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor} - * is being used to get an {@link MessageStream} using {@link StreamGraph#getInputStream}. + * is being used to get an {@link MessageStream} using {@link StreamApplicationDescriptor#getInputStream}. * <p> * Notes for system implementers: * <p> * Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call - * {@link StreamGraph#getInputStream} with an {@link InputDescriptor} from an expanding system descriptor + * {@link StreamApplicationDescriptor#getInputStream} with an {@link InputDescriptor} from an expanding system descriptor * (like this one) again. * <p> * It's the {@link StreamExpander}'s responsibility to propagate any properties, including serde, from the * user-provided {@link InputDescriptor} to the expanded input descriptors. * - * @param streamGraph the {@link StreamGraph} to register the expanded sub-DAG of operators on + * @param streamAppDesc the {@link StreamApplicationDescriptor} to register the expanded sub-DAG of operators on * @param inputDescriptor the {@link InputDescriptor} to be expanded * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators */ - MessageStream<OM> apply(StreamGraph streamGraph, InputDescriptor inputDescriptor); + MessageStream<OM> apply(StreamApplicationDescriptor streamAppDesc, InputDescriptor inputDescriptor); } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java index 45abb5d..59543c0 100644 --- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java +++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java @@ -20,98 +20,43 @@ package org.apache.samza.runtime; import java.time.Duration; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.config.ConfigException; import org.apache.samza.job.ApplicationStatus; -import java.lang.reflect.Constructor; - /** - * The primary means of managing execution of the {@link org.apache.samza.application.StreamApplication} at runtime. + * The primary means of managing execution of the {@link org.apache.samza.application.SamzaApplication} at runtime. + * + * <p> + * Implementation Notes: implementation of {@link ApplicationRunner} must have a public default constructor + * #ApplicationRunner(SamzaApplication, Config) */ -@InterfaceStability.Unstable -public abstract class ApplicationRunner { - - private static final String RUNNER_CONFIG = "app.runner.class"; - private static final String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner"; - - protected final Config config; - - /** - * Static method to load the {@link ApplicationRunner} - * - * @param config configuration passed in to initialize the Samza processes - * @return the configure-driven {@link ApplicationRunner} to run the user-defined stream applications - */ - public static ApplicationRunner fromConfig(Config config) { - try { - Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, DEFAULT_RUNNER_CLASS)); - if (ApplicationRunner.class.isAssignableFrom(runnerClass)) { - Constructor<?> constructor = runnerClass.getConstructor(Config.class); // *sigh* - return (ApplicationRunner) constructor.newInstance(config); - } - } catch (Exception e) { - throw new ConfigException(String.format("Problem in loading ApplicationRunner class %s", config.get( - RUNNER_CONFIG)), e); - } - throw new ConfigException(String.format( - "Class %s does not extend ApplicationRunner properly", - config.get(RUNNER_CONFIG))); - } - - - public ApplicationRunner(Config config) { - if (config == null) { - throw new NullPointerException("Parameter 'config' cannot be null."); - } - - this.config = config; - } - - /** - * Deploy and run the Samza jobs to execute {@link org.apache.samza.task.StreamTask}. - * It is non-blocking so it doesn't wait for the application running. - * This method assumes you task.class is specified in the configs. - * - * NOTE. this interface will most likely change in the future. - */ - @InterfaceStability.Evolving - public abstract void runTask(); - +@InterfaceStability.Evolving +public interface ApplicationRunner { /** - * Deploy and run the Samza jobs to execute {@link StreamApplication}. + * Deploy and run the Samza jobs to execute {@link org.apache.samza.application.SamzaApplication}. * It is non-blocking so it doesn't wait for the application running. - * - * @param streamApp the user-defined {@link StreamApplication} object */ - public abstract void run(StreamApplication streamApp); + void run(); /** - * Kill the Samza jobs represented by {@link StreamApplication} + * Kill the Samza jobs represented by {@link org.apache.samza.application.SamzaApplication} * It is non-blocking so it doesn't wait for the application stopping. - * - * @param streamApp the user-defined {@link StreamApplication} object */ - public abstract void kill(StreamApplication streamApp); + void kill(); /** - * Get the collective status of the Samza jobs represented by {@link StreamApplication}. - * Returns {@link ApplicationRunner} running if all jobs are running. + * Get the collective status of the Samza jobs represented by {@link org.apache.samza.application.SamzaApplication}. + * Returns {@link ApplicationStatus} object. * - * @param streamApp the user-defined {@link StreamApplication} object - * @return the status of the application + * @return the current status of an instance of {@link org.apache.samza.application.SamzaApplication} */ - public abstract ApplicationStatus status(StreamApplication streamApp); + ApplicationStatus status(); /** * Waits until the application finishes. */ - public void waitForFinish() { - throw new UnsupportedOperationException(getClass().getName() + " does not support waitForFinish."); - } + void waitForFinish(); /** * Waits for {@code timeout} duration for the application to finish. @@ -120,7 +65,6 @@ public abstract class ApplicationRunner { * @return true - application finished before timeout * false - otherwise */ - public boolean waitForFinish(Duration timeout) { - throw new UnsupportedOperationException(getClass().getName() + " does not support timed waitForFinish."); - } + boolean waitForFinish(Duration timeout); + } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java new file mode 100644 index 0000000..cd1d06b --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import java.lang.reflect.Constructor; +import org.apache.samza.application.SamzaApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; + + +/** + * Creates {@link ApplicationRunner} instances based on configuration and user-implemented {@link SamzaApplication} + * + * <p> This class is usually used in main() function to create an instance of {@link ApplicationRunner}, as in the example + * below: + * <pre>{@code + * public static void main(String[] args) { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * PageViewCounter app = new PageViewCounter(); + * ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config); + * runner.run(); + * runner.waitForFinish(); + * } + * }</pre> + */ +public class ApplicationRunners { + + private static final String APP_RUNNER_CFG = "app.runner.class"; + private static final String DEFAULT_APP_RUNNER = "org.apache.samza.runtime.RemoteApplicationRunner"; + + private ApplicationRunners() { + + } + + /** + * Get the {@link ApplicationRunner} that runs the {@code userApp} + * + * @param userApp the user application object + * @param config the configuration for this application + * @return the {@link ApplicationRunner} object that will run the {@code userApp} + */ + public static final ApplicationRunner getApplicationRunner(SamzaApplication userApp, Config config) { + String appRunnerClassName = getAppRunnerClass(config); + try { + Class<?> runnerClass = Class.forName(appRunnerClassName); + if (!ApplicationRunner.class.isAssignableFrom(runnerClass)) { + throw new ConfigException( + String.format("Class %s does not extend ApplicationRunner properly", appRunnerClassName)); + } + Constructor<?> constructor = runnerClass.getConstructor(SamzaApplication.class, Config.class); // *sigh* + return (ApplicationRunner) constructor.newInstance(userApp, config); + } catch (ConfigException ce) { + // this is thrown due to invalid app.runner.class configuration + throw ce; + } catch (Exception e) { + // other types of exception during class loading and construction of new instance + throw new ConfigException(String.format("Could not load ApplicationRunner class %s", appRunnerClassName), e); + } + } + + private static String getAppRunnerClass(Config config) { + return config.getOrDefault(APP_RUNNER_CFG, DEFAULT_APP_RUNNER); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorContext.java b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorContext.java new file mode 100644 index 0000000..4e9f5ba --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorContext.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * The context for a StreamProcessor. A placeholder class for the general context for Samza application. + * + * TODO: pending change with SAMZA-1714 + */ +@InterfaceStability.Unstable +public interface ProcessorContext { +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListener.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListener.java b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListener.java new file mode 100644 index 0000000..f4e49ed --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * This interface defines methods that are invoked in different stages of StreamProcessor's lifecycle in local + * process (i.e. as a standalone process, or a container process in YARN NodeManager). + * + * <p> + * User can implement this interface to instantiate/release shared objects in the local process. + */ +@InterfaceStability.Evolving +public interface ProcessorLifecycleListener { + /** + * User defined initialization before a StreamProcessor is started + */ + default void beforeStart() {} + + /** + * User defined callback after a StreamProcessor is started + * + */ + default void afterStart() {} + + /** + * User defined callback after a StreamProcessor is stopped successfully + */ + default void afterStop() {} + + /** + * User defined callback after a StreamProcessor is stopped with failure + * + * @param t the error causing the stop of the StreamProcessor. + */ + default void afterFailure(Throwable t) {} +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListenerFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListenerFactory.java b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListenerFactory.java new file mode 100644 index 0000000..eec7b70 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorLifecycleListenerFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import java.io.Serializable; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; + + +/** + * This interface class defines the factory method to create an instance of {@link ProcessorLifecycleListener}. + */ +@InterfaceStability.Evolving +public interface ProcessorLifecycleListenerFactory extends Serializable { + /** + * Create an instance of {@link ProcessorLifecycleListener} for the StreamProcessor + * + * @param pContext the context of the corresponding StreamProcessor. Note that {@link ProcessorContext} is just a + * placeholder before we have a proper implementation of general context in SAMZA-1714 + * @param config the configuration of the corresponding StreamProcessor + * @return the {@link ProcessorLifecycleListener} callback object for the StreamProcessor + */ + ProcessorLifecycleListener createInstance(ProcessorContext pContext, Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java index e5ce9c4..1879ce8 100644 --- a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java +++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTaskFactory.java @@ -19,10 +19,14 @@ package org.apache.samza.task; +import org.apache.samza.annotation.InterfaceStability; + + /** * Build {@link AsyncStreamTask} instances. - * Implementations should return a new instance for each {@link #createInstance()} invocation. + * <p> + * Implementations should return a new instance of {@link AsyncStreamTask} for each {@link #createInstance()} invocation. */ -public interface AsyncStreamTaskFactory { - AsyncStreamTask createInstance(); +@InterfaceStability.Stable +public interface AsyncStreamTaskFactory extends TaskFactory<AsyncStreamTask> { } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java b/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java index 52adef6..8588a0d 100644 --- a/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java +++ b/samza-api/src/main/java/org/apache/samza/task/StreamTaskFactory.java @@ -23,9 +23,9 @@ import org.apache.samza.annotation.InterfaceStability; /** * Build {@link StreamTask} instances. - * Implementations should return a new instance for each {@link #createInstance()} invocation. + * <p> + * Implementations should return a new instance of {@link StreamTask} for each {@link #createInstance()} invocation. */ @InterfaceStability.Stable -public interface StreamTaskFactory { - StreamTask createInstance(); +public interface StreamTaskFactory extends TaskFactory<StreamTask> { } http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java b/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java new file mode 100644 index 0000000..8443d20 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/task/TaskFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import java.io.Serializable; +import org.apache.samza.annotation.InterfaceStability; + + +/** + * The interface for all task factories (i.e. {@link StreamTaskFactory} and {@link AsyncStreamTaskFactory} + * + * @param <T> the type of task instances + */ +@InterfaceStability.Stable +public interface TaskFactory<T> extends Serializable { + /** + * Create instance of task + * + * @return task of type T + */ + T createInstance(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java b/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java new file mode 100644 index 0000000..5829cf7 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.runtime; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.application.SamzaApplication; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.ApplicationStatus; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + + +/** + * Unit test for {@link ApplicationRunners} + */ +public class TestApplicationRunners { + + @Test + public void testGetAppRunner() { + Map<String, String> configMap = new HashMap<>(); + configMap.put("app.runner.class", MockApplicationRunner.class.getName()); + Config config = new MapConfig(configMap); + StreamApplication app = mock(StreamApplication.class); + ApplicationRunner appRunner = ApplicationRunners.getApplicationRunner(app, config); + assertTrue(appRunner instanceof MockApplicationRunner); + } + + /** + * Test class for {@link ApplicationRunners} unit test + */ + public static class MockApplicationRunner implements ApplicationRunner { + private final SamzaApplication userApp; + private final Config config; + + public MockApplicationRunner(SamzaApplication userApp, Config config) { + this.userApp = userApp; + this.config = config; + } + + @Override + public void run() { + + } + + @Override + public void kill() { + + } + + @Override + public ApplicationStatus status() { + return null; + } + + @Override + public void waitForFinish() { + + } + + @Override + public boolean waitForFinish(Duration timeout) { + return false; + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java new file mode 100644 index 0000000..9679136 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.application; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsReporterFactory; +import org.apache.samza.operators.ContextManager; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.runtime.ProcessorLifecycleListener; +import org.apache.samza.runtime.ProcessorLifecycleListenerFactory; +import org.apache.samza.task.TaskContext; + + +/** + * This is the base class that implements interface {@link ApplicationDescriptor}. + * <p> + * This base class contains the common objects that are used by both high-level and low-level API applications, such as + * {@link Config}, {@link ContextManager}, and {@link ProcessorLifecycleListenerFactory}. + * + * @param <S> the type of {@link ApplicationDescriptor} interface this implements. It has to be either + * {@link StreamApplicationDescriptor} or {@link TaskApplicationDescriptor} + */ +public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> + implements ApplicationDescriptor<S> { + + final Config config; + private final Class<? extends SamzaApplication> appClass; + private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>(); + + // Default to no-op functions in ContextManager + // TODO: this should be replaced by shared context factory defined in SAMZA-1714 + ContextManager contextManager = new ContextManager() { + @Override + public void init(Config config, TaskContext context) { + } + + @Override + public void close() { + } + }; + + // Default to no-op ProcessorLifecycleListenerFactory + ProcessorLifecycleListenerFactory listenerFactory = (pcontext, cfg) -> new ProcessorLifecycleListener() { }; + + ApplicationDescriptorImpl(SamzaApplication app, Config config) { + this.config = config; + this.appClass = app.getClass(); + } + + @Override + public Config getConfig() { + return config; + } + + @Override + public S withContextManager(ContextManager contextManager) { + this.contextManager = contextManager; + return (S) this; + } + + @Override + public S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) { + this.listenerFactory = listenerFactory; + return (S) this; + } + + @Override + public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) { + this.reporterFactories.clear(); + this.reporterFactories.putAll(reporterFactories); + return (S) this; + } + + /** + * Get the application class + * + * @return an implementation of {@link SamzaApplication} + */ + public Class<? extends SamzaApplication> getAppClass() { + return appClass; + } + + /** + * Get the {@link ContextManager} associated with this application + * + * @return the {@link ContextManager} for this application + */ + public ContextManager getContextManager() { + return contextManager; + } + + /** + * Get the {@link ProcessorLifecycleListenerFactory} associated with this application + * + * @return the {@link ProcessorLifecycleListenerFactory} in this application + */ + public ProcessorLifecycleListenerFactory getProcessorLifecycleListenerFactory() { + return listenerFactory; + } + + /** + * Get the {@link MetricsReporterFactory}s used in the application + * + * @return the map of {@link MetricsReporterFactory}s + */ + public Map<String, MetricsReporterFactory> getMetricsReporterFactories() { + return Collections.unmodifiableMap(reporterFactories); + } + + /** + * Get the default {@link SystemDescriptor} in this application + * + * @return the default {@link SystemDescriptor} + */ + public Optional<SystemDescriptor> getDefaultSystemDescriptor() { + // default is not set + return Optional.empty(); + } + + /** + * Get all the {@link InputDescriptor}s to this application + * + * @return an immutable map of streamId to {@link InputDescriptor} + */ + public abstract Map<String, InputDescriptor> getInputDescriptors(); + + /** + * Get all the {@link OutputDescriptor}s from this application + * + * @return an immutable map of streamId to {@link OutputDescriptor} + */ + public abstract Map<String, OutputDescriptor> getOutputDescriptors(); + + /** + * Get all the broadcast streamIds from this application + * + * @return an immutable set of streamIds + */ + public abstract Set<String> getBroadcastStreams(); + + /** + * Get all the {@link TableDescriptor}s in this application + * + * @return an immutable set of {@link TableDescriptor}s + */ + public abstract Set<TableDescriptor> getTableDescriptors(); + + /** + * Get all the unique {@link SystemDescriptor}s in this application + * + * @return an immutable set of {@link SystemDescriptor}s + */ + public abstract Set<SystemDescriptor> getSystemDescriptors(); + +} \ No newline at end of file