Looked again at Chris's beam-samza-runner implementation. Seems LocalApplicationRunner.run() should be asynchronous too. Current implementation is actually using a latch to wait for the StreamProcessors to finish, which seems unnecessary. And we can provide a waitUntilFinish() counterpart to the user. I created https://issues.apache.org/jira/browse/SAMZA-1252 to track it.
Thanks, Xinyu On Fri, Apr 28, 2017 at 5:55 PM, xinyu liu <xinyuliu...@gmail.com> wrote: > Right, option #2 seems redundant for defining streams after further > discussion here. StreamSpec itself is flexible enough to achieve both > static and programmatic specification of the stream. Agree it's not > convenient for now (pretty obvious after looking at your bsr > beam.runners.samza.wrapper), and we should provide similar predefined > convenient wrappers for user to create the StreamSpec. In your case > something like BoundedStreamSpec.file(....) which will generate the system > and serialize the data as you did. > > We're still thinking the callback proposed in #2 can be useful for > requirement #6: injecting other user objects in run time, such as stores > and metrics. To simplify the user understanding further, I think we might > hide the ApplicationRunner and expose the StreamApplication instead, which > will make requirement #3 not user facing. So the API becomes like: > > StreamApplication app = StreamApplication.local(config) > .init (env -> { > env.registerStore("my-store", new MyStoreFactory()); > env.registerMetricsReporter("my-reporte", new > MyMetricsReporterFactory()); > }) > .withLifeCycleListener(myListener); > > app.input(BoundedStreamSpec.create("/sample/input.txt")) > .map(...) > .window(...) > > app.run(); > > For requirement #5, I add a .withLifeCycleListener() in the API, which can > trigger the callbacks with life cycle events. > > For #4: distribution of the jars will be what we have today using the Yarn > localization with a remote store like artifactory or http server. We > discussed where to put the graph serialization. The current thinking is to > define a general interface which can backed by a remote store, like Kafka, > artifactory or http server. For Kafka, it's straightforward but we will > have the size limit or cut it by ourselves. For the other two, we need to > investigate whether we can easily upload jars to our artifactory and > localizing it with Yarn. Any opinions on this? > > Thanks, > Xinyu > > On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt < > cpett...@linkedin.com.invalid> wrote: > >> Your proposal for #1 looks good. >> >> I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add >> the stream spec straight onto the runner while in #2 you do it in a >> callback. If it is either-or, #1 looks a lot better for my purposes. >> >> For #4 what mechanism are you using to distribute the JARs? Can you use >> the >> same mechanism to distribute the serialized graph? >> >> On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu <xinyuliu...@gmail.com> >> wrote: >> >> > btw, I will get to SAMZA-1246 as soon as possible. >> > >> > Thanks, >> > Xinyu >> > >> > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xinyuliu...@gmail.com> >> wrote: >> > >> > > Let me try to capture the updated requirements: >> > > >> > > 1. Set up input streams outside StreamGraph, and treat graph building >> as >> > a >> > > library (*Fluent, Beam*). >> > > >> > > 2. Improve ease of use for ApplicationRunner to avoid complex >> > > configurations such as zkCoordinator, zkCoordinationService. >> > (*Standalone*). >> > > Provide some programmatic way to tweak them in the API. >> > > >> > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We >> can >> > > have one or more implementations but it's hidden from the users. >> > > >> > > 4. Separate StreamGraph from runtime environment so it can be >> serialized >> > (*Beam, >> > > Yarn*) >> > > >> > > 5. Better life cycle management of application, parity with >> > > StreamProcessor (*Standalone, Beam*). Stats should include exception >> in >> > > case of failure (tracked in SAMZA-1246). >> > > >> > > 6. Support injecting user-defined objects into ApplicationRunner. >> > > >> > > Prateek and I iterate on the ApplilcationRunner API based on these >> > > requirements. To support #1, we can set up input streams on the runner >> > > level, which returns the MessageStream and allows graph building >> > > afterwards. The code looks like below: >> > > >> > > ApplicationRunner runner = ApplicationRunner.local(); >> > > runner.input(streamSpec) >> > > .map(..) >> > > .window(...) >> > > runner.run(); >> > > >> > > StreamSpec is the building block for setting up streams here. It can >> be >> > > set up in different ways: >> > > >> > > - Direct creation of stream spec, like runner.input(new >> StreamSpec(id, >> > > system, stream)) >> > > - Load from streamId from env or config, like >> > runner.input(runner.env(). >> > > getStreamSpec(id)) >> > > - Canned Spec which generates the StreamSpec with id, system and >> stream >> > > to minimize the configuration. For example, CollectionSpec.create(new >> > > ArrayList[]{1,2,3,4}), which will auto generate the system and stream >> in >> > > the spec. >> > > >> > > To support #2, we need to be able to set up StreamSpec-related objects >> > and >> > > factories programmatically in env. Suppose we have the following >> before >> > > runner.input(...): >> > > >> > > runner.setup(env /* a writable interface of env*/ -> { >> > > env.setStreamSpec(streamId, streamSpec); >> > > env.setSystem(systemName, systemFactory); >> > > }) >> > > >> > > runner.setup(->) also provides setup for stores and other runtime >> stuff >> > > needed for the execution. The setup should be able to serialized to >> > config. >> > > For #6, I haven't figured out a good way to inject user-defined >> objects >> > > here yet. >> > > >> > > With this API, we should be able to also support #4. For remote >> > > runner.run(), the operator user classes/lamdas in the StreamGraph >> need to >> > > be serialized. As today, the existing option is to serialize to a >> stream, >> > > either the coordinator stream or the pipeline control stream, which >> will >> > > have the size limit per message. Do you see RPC as an option? >> > > >> > > For this version of API, seems we don't need the StreamApplication >> > wrapper >> > > as well as exposing the StreamGraph. Do you think we are on the right >> > path? >> > > >> > > Thanks, >> > > Xinyu >> > > >> > > >> > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt < >> > > cpett...@linkedin.com.invalid> wrote: >> > > >> > >> That should have been: >> > >> >> > >> For #1, Beam doesn't have a hard requirement... >> > >> >> > >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt < >> cpett...@linkedin.com> >> > >> wrote: >> > >> >> > >> > For #1, I doesn't have a hard requirement for any change from >> Samza. A >> > >> > very nice to have would be to allow the input systems to be set up >> at >> > >> the >> > >> > same time as the rest of the StreamGraph. An even nicer to have >> would >> > >> be to >> > >> > do away with the callback based approach and treat graph building >> as a >> > >> > library, a la Beam and Flink. >> > >> > >> > >> > For the moment I've worked around the two pass requirement (once >> for >> > >> > config, once for StreamGraph) by introducing an IR layer between >> Beam >> > >> and >> > >> > the Samza Fluent translation. The IR layer is convenient >> independent >> > of >> > >> > this problem because it makes it easier to switch between the >> Fluent >> > and >> > >> > low-level APIs. >> > >> > >> > >> > >> > >> > For #4, if we had parity with StreamProcessor for lifecycle we'd >> be in >> > >> > great shape. One additional issue with the status call that I may >> not >> > >> have >> > >> > mentioned is that it provides you no way to get at the cause of >> > failure. >> > >> > The StreamProcessor API does allow this via the callback. >> > >> > >> > >> > >> > >> > Re. #2 and #3, I'm a big fan of getting rid of the extra >> configuration >> > >> > indirection you currently have to jump through (this is also >> related >> > to >> > >> > system consumer configuration from #1. It makes it much easier to >> > >> discover >> > >> > what the configurable parameters are too, if we provide some >> > >> programmatic >> > >> > way to tweak them in the API - which can turn into config under the >> > >> hood. >> > >> > >> > >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <xinyuliu...@gmail.com> >> > >> wrote: >> > >> > >> > >> >> Let me give a shot to summarize the requirements for >> > ApplicationRunner >> > >> we >> > >> >> have discussed so far: >> > >> >> >> > >> >> - Support environment for passing in user-defined objects (streams >> > >> >> potentially) into ApplicationRunner (*Beam*) >> > >> >> >> > >> >> - Improve ease of use for ApplicationRunner to avoid complex >> > >> >> configurations >> > >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*) >> > >> >> >> > >> >> - Clean up ApplicationRunner into a single interface (*Fluent*). >> We >> > can >> > >> >> have one or more implementations but it's hidden from the users. >> > >> >> >> > >> >> - Separate StreamGraph from environment so it can be serializable >> > >> (*Beam, >> > >> >> Yarn*) >> > >> >> >> > >> >> - Better life cycle management of application, including >> > >> >> start/stop/stats (*Standalone, >> > >> >> Beam*) >> > >> >> >> > >> >> >> > >> >> One way to address 2 and 3 is to provide pre-packaged runner using >> > >> static >> > >> >> factory methods, and the return type will be the ApplicationRunner >> > >> >> interface. So we can have: >> > >> >> >> > >> >> ApplicationRunner runner = ApplicationRunner.zk() / >> > >> >> ApplicationRunner.local() >> > >> >> / ApplicationRunner.remote() / ApplicationRunner.test(). >> > >> >> >> > >> >> Internally we will package the right configs and run-time >> environment >> > >> with >> > >> >> the runner. For example, ApplicationRunner.zk() will define all >> the >> > >> >> configs >> > >> >> needed for zk coordination. >> > >> >> >> > >> >> To support 1 and 4, can we pass in a lambda function in the >> runner, >> > and >> > >> >> then we can run the stream graph? Like the following: >> > >> >> >> > >> >> ApplicationRunner.zk().env(config -> >> > environment).run(streamGraph); >> > >> >> >> > >> >> Then we need a way to pass the environment into the StreamGraph. >> This >> > >> can >> > >> >> be done by either adding an extra parameter to each operator, or >> > have a >> > >> >> getEnv() function in the MessageStream, which seems to be pretty >> > hacky. >> > >> >> >> > >> >> What do you think? >> > >> >> >> > >> >> Thanks, >> > >> >> Xinyu >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari < >> > >> >> pmaheshw...@linkedin.com.invalid> wrote: >> > >> >> >> > >> >> > Thanks for putting this together Yi! >> > >> >> > >> > >> >> > I agree with Jake, it does seem like there are a few too many >> > moving >> > >> >> parts >> > >> >> > here. That said, the problem being solved is pretty broad, so >> let >> > me >> > >> >> try to >> > >> >> > summarize my current understanding of the requirements. Please >> > >> correct >> > >> >> me >> > >> >> > if I'm wrong or missing something. >> > >> >> > >> > >> >> > ApplicationRunner and JobRunner first, ignoring test environment >> > for >> > >> the >> > >> >> > moment. >> > >> >> > ApplicationRunner: >> > >> >> > 1. Create execution plan: Same in Standalone and Yarn >> > >> >> > 2. Create intermediate streams: Same logic but different leader >> > >> election >> > >> >> > (ZK-based or pre-configured in standalone, AM in Yarn). >> > >> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in >> Yarn. >> > >> >> > >> > >> >> > JobRunner: >> > >> >> > 1. Run the StreamProcessors: Same process in Standalone & Test. >> > >> Remote >> > >> >> host >> > >> >> > in Yarn. >> > >> >> > >> > >> >> > To get a single ApplicationRunner implementation, like Jake >> > >> suggested, >> > >> >> we >> > >> >> > need to make leader election and JobRunner implementation >> > pluggable. >> > >> >> > There's still the question of whether ApplicationRunner#run API >> > >> should >> > >> >> be >> > >> >> > blocking or non-blocking. It has to be non-blocking in YARN. We >> > want >> > >> it >> > >> >> to >> > >> >> > be blocking in standalone, but seems like the main reason is >> ease >> > of >> > >> use >> > >> >> > when launched from main(). I'd prefer making it consitently >> > >> non-blocking >> > >> >> > instead, esp. since in embedded standalone mode (where the >> > processor >> > >> is >> > >> >> > running in another container) a blocking API would not be >> > >> user-friendly >> > >> >> > either. If not, we can add both run and runBlocking. >> > >> >> > >> > >> >> > Coming to RuntimeEnvironment, which is the least clear to me so >> > far: >> > >> >> > 1. I don't think RuntimeEnvironment should be responsible for >> > >> providing >> > >> >> > StreamSpecs for streamIds - they can be obtained with a >> config/util >> > >> >> class. >> > >> >> > The StreamProcessor should only know about logical streamIds and >> > the >> > >> >> > streamId <-> actual stream mapping should happen within the >> > >> >> > SystemProducer/Consumer/Admins provided by the >> RuntimeEnvironment. >> > >> >> > 2. There's also other components that the user might be >> interested >> > in >> > >> >> > providing implementations of in embedded Standalone mode (i.e., >> not >> > >> >> just in >> > >> >> > tests) - MetricsRegistry and JMXServer come to mind. >> > >> >> > 3. Most importantly, it's not clear to me who creates and >> manages >> > the >> > >> >> > RuntimeEnvironment. It seems like it should be the >> > ApplicationRunner >> > >> or >> > >> >> the >> > >> >> > user because of (2) above and because StreamManager also needs >> > >> access to >> > >> >> > SystemAdmins for creating intermediate streams which users might >> > >> want to >> > >> >> > mock. But it also needs to be passed down to the >> StreamProcessor - >> > >> how >> > >> >> > would this work on Yarn? >> > >> >> > >> > >> >> > I think we should figure out how to integrate RuntimeEnvironment >> > with >> > >> >> > ApplicationRunner before we can make a call on one vs. multiple >> > >> >> > ApplicationRunner implementations. If we do keep >> > >> LocalApplicationRunner >> > >> >> and >> > >> >> > RemoteApplication (and TestApplicationRunner) separate, agree >> with >> > >> Jake >> > >> >> > that we should remove the JobRunners and roll them up into the >> > >> >> respective >> > >> >> > ApplicationRunners. >> > >> >> > >> > >> >> > - Prateek >> > >> >> > >> > >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes < >> jacob.m...@gmail.com >> > > >> > >> >> wrote: >> > >> >> > >> > >> >> > > Thanks for the SEP! >> > >> >> > > >> > >> >> > > +1 on introducing these new components >> > >> >> > > -1 on the current definition of their roles (see Design >> feedback >> > >> >> below) >> > >> >> > > >> > >> >> > > *Design* >> > >> >> > > >> > >> >> > > - If LocalJobRunner and RemoteJobRunner handle the >> different >> > >> >> methods >> > >> >> > of >> > >> >> > > launching a Job, what additional value do the different >> types >> > of >> > >> >> > > ApplicationRunner and RuntimeEnvironment provide? It seems >> > like >> > >> a >> > >> >> red >> > >> >> > > flag >> > >> >> > > that all 3 would need to change from environment to >> > >> environment. It >> > >> >> > > indicates that they don't have proper modularity. The >> > >> >> > > call-sequence-figures >> > >> >> > > support this; LocalApplicationRunner and >> > RemoteApplicationRunner >> > >> >> make >> > >> >> > > the >> > >> >> > > same calls and the diagram only varies after >> jobRunner.start() >> > >> >> > > - As far as I can tell, the only difference between Local >> and >> > >> >> Remote >> > >> >> > > ApplicationRunner is that one is blocking and the other is >> > >> >> > > non-blocking. If >> > >> >> > > that's all they're for then either the names should be >> changed >> > >> to >> > >> >> > > reflect >> > >> >> > > this, or they should be combined into one ApplicationRunner >> > and >> > >> >> just >> > >> >> > > expose >> > >> >> > > separate methods for run() and runBlocking() >> > >> >> > > - There isn't much detail on why the main() methods for >> > >> >> Local/Remote >> > >> >> > > have such different implementations, how they receive the >> > >> >> Application >> > >> >> > > (direct vs config), and concretely how the deployment >> scripts, >> > >> if >> > >> >> any, >> > >> >> > > should interact with them. >> > >> >> > > >> > >> >> > > >> > >> >> > > *Style* >> > >> >> > > >> > >> >> > > - nit: None of the 11 uses of the word "actual" in the doc >> are >> > >> >> > > *actually* >> > >> >> > > needed. :-) >> > >> >> > > - nit: Colors of the runtime blocks in the diagrams are >> > >> >> unconventional >> > >> >> > > and a little distracting. Reminds me of nai won bao. Now >> I'm >> > >> >> hungry. >> > >> >> > :-) >> > >> >> > > - Prefer the name "ExecutionEnvironment" over >> > >> "RuntimeEnvironment". >> > >> >> > The >> > >> >> > > term "execution environment" is used >> > >> >> > > - The code comparisons for the ApplicationRunners are not >> > >> >> > apples-apples. >> > >> >> > > The local runner example is an application that USES the >> local >> > >> >> runner. >> > >> >> > > The >> > >> >> > > remote runner example is the just the runner code itself. >> So, >> > >> it's >> > >> >> not >> > >> >> > > readily apparent that we're comparing the main() methods >> and >> > not >> > >> >> the >> > >> >> > > application itself. >> > >> >> > > >> > >> >> > > >> > >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan <nickpa...@gmail.com> >> > >> wrote: >> > >> >> > > >> > >> >> > > > Made some updates to clarify the role and functions of >> > >> >> > RuntimeEnvironment >> > >> >> > > > in SEP-2. >> > >> >> > > > >> > >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan < >> nickpa...@gmail.com> >> > >> >> wrote: >> > >> >> > > > >> > >> >> > > > > Hi, everyone, >> > >> >> > > > > >> > >> >> > > > > In light of new features such as fluent API and standalone >> > that >> > >> >> > > introduce >> > >> >> > > > > new deployment / application launch models in Samza, I >> > created >> > >> a >> > >> >> new >> > >> >> > > > SEP-2 >> > >> >> > > > > to address the new use cases. SEP-2 link: >> > https://cwiki.apache >> > >> . >> > >> >> > > > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+ >> > Design >> > >> >> > > > > >> > >> >> > > > > Please take a look and give feedbacks! >> > >> >> > > > > >> > >> >> > > > > Thanks! >> > >> >> > > > > >> > >> >> > > > > -Yi >> > >> >> > > > > >> > >> >> > > > >> > >> >> > > >> > >> >> > >> > >> >> >> > >> > >> > >> > >> > >> >> > > >> > > >> > >> > >