This is what i want and not 999999 teardowns for 1000000 setups until there is an unexpected crash (= a bug).
Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit : > > > On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau <rmannibu...@gmail.com > > wrote: > >> >> >> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: >> >>> >>> >>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> @Reuven: in practise it is created by pool of 256 but leads to the same >>>> pattern, the teardown is just a "if (iCreatedThem) releaseThem();" >>>> >>> >>> How do you control "256?" Even if you have a pool of 256 workers, >>> nothing in Beam guarantees how many threads and DoFns are created per >>> worker. In theory the runner might decide to create 1000 threads on each >>> worker. >>> >> >> Nop was the other way around, in this case on AWS you can get 256 >> instances at once but not 512 (which will be 2x256). So when you compute >> the distribution you allocate to some fn the role to own the instance >> lookup and releasing. >> > > I still don't understand. Let's be more precise. If you write the > following code: > > pCollection.apply(ParDo.of(new MyDoFn())); > > There is no way to control how many instances of MyDoFn are created. The > runner might decided to create a million instances of this class across > your worker pool, which means that you will get a million Setup and > Teardown calls. > > >> Anyway this was just an example of an external resource you must release. >> Real topic is that beam should define asap a guaranteed generic lifecycle >> to let user embrace its programming model. >> >> >>> >>> >>> >>>> @Eugene: >>>> 1. wait logic is about passing the value which is not always possible >>>> (like 15% of cases from my raw estimate) >>>> 2. sdf: i'll try to detail why i mention SDF more here >>>> >>>> >>>> Concretely beam exposes a portable API (included in the SDK core). This >>>> API defines a *container* API and therefore implies bean lifecycles. I'll >>>> not detail them all but just use the sources and dofn (not sdf) to >>>> illustrate the idea I'm trying to develop. >>>> >>>> A. Source >>>> >>>> A source computes a partition plan with 2 primitives: estimateSize and >>>> split. As an user you can expect both to be called on the same bean >>>> instance to avoid to pay the same connection cost(s) twice. Concretely: >>>> >>>> connect() >>>> try { >>>> estimateSize() >>>> split() >>>> } finally { >>>> disconnect() >>>> } >>>> >>>> this is not guaranteed by the API so you must do: >>>> >>>> connect() >>>> try { >>>> estimateSize() >>>> } finally { >>>> disconnect() >>>> } >>>> connect() >>>> try { >>>> split() >>>> } finally { >>>> disconnect() >>>> } >>>> >>>> + a workaround with an internal estimate size since this primitive is >>>> often called in split but you dont want to connect twice in the second >>>> phase. >>>> >>>> Why do you need that? Simply cause you want to define an API to >>>> implement sources which initializes the source bean and destroys it. >>>> I insists it is a very very basic concern for such API. However beam >>>> doesn't embraces it and doesn't assume it so building any API on top of >>>> beam is very hurtful today and for direct beam users you hit the exact same >>>> issues - check how IO are implemented, the static utilities which create >>>> volatile connections preventing to reuse existing connection in a single >>>> method (https://github.com/apache/beam/blob/master/sdks/java/io/ela >>>> sticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearc >>>> h/ElasticsearchIO.java#L862). >>>> >>>> Same logic applies to the reader which is then created. >>>> >>>> B. DoFn & SDF >>>> >>>> As a fn dev you expect the same from the beam runtime: init(); try { >>>> while (...) process(); } finally { destroy(); } and that it is executed on >>>> the exact same instance to be able to be stateful at that level for >>>> expensive connections/operations/flow state handling. >>>> >>>> As you mentionned with the million example, this sequence should happen >>>> for each single instance so 1M times for your example. >>>> >>>> Now why did I mention SDF several times? Because SDF is a >>>> generalisation of both cases (source and dofn). Therefore it creates way >>>> more instances and requires to have a way more strict/explicit definition >>>> of the exact lifecycle and which instance does what. Since beam handles the >>>> full lifecycle of the bean instances it must provide init/destroy hooks >>>> (setup/teardown) which can be stateful. >>>> >>>> If you take the JDBC example which was mentionned earlier. Today, >>>> because of the teardown issue it uses bundles. Since bundles size is not >>>> defined - and will not with SDF, it must use a pool to be able to reuse a >>>> connection instance to not correct performances. Now with the SDF and the >>>> split increase, how do you handle the pool size? Generally in batch you use >>>> a single connection per thread to avoid to consume all database >>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. use a >>>> pool a bit higher but multiplied by the number of beans you will likely x2 >>>> or 3 the connection count and make the execution fail with "no more >>>> connection available". I you picked 1 (pool of #1), then you still have to >>>> have a reliable teardown by pool instance (close() generally) to ensure you >>>> release the pool and don't leak the connection information in the JVM. In >>>> all case you come back to the init()/destroy() lifecycle even if you fake >>>> to get connections with bundles. >>>> >>>> Just to make it obvious: SDF mentions are just cause SDF imply all the >>>> current issues with the loose definition of the bean lifecycles at an >>>> exponential level, nothing else. >>>> >>>> >>>> >>>> Romain Manni-Bucau >>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>> <http://rmannibucau.wordpress.com> | Github >>>> <https://github.com/rmannibucau> | LinkedIn >>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>> >>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>>> >>>>> The kind of whole-transform lifecycle you're mentioning can be >>>>> accomplished using the Wait transform as I suggested in the thread above, >>>>> and I believe it should become the canonical way to do that. >>>>> >>>>> (Would like to reiterate one more time, as the main author of most >>>>> design documents related to SDF and of its implementation in the Java >>>>> direct and dataflow runner that SDF is fully unrelated to the topic of >>>>> cleanup - I'm very confused as to why it keeps coming up) >>>>> >>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>> >>>>>> I kind of agree except transforms lack a lifecycle too. My >>>>>> understanding is that sdf could be a way to unify it and clean the api. >>>>>> >>>>>> Otherwise how to normalize - single api - lifecycle of transforms? >>>>>> >>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a >>>>>> écrit : >>>>>> >>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is >>>>>>> appropriate? Many cases where cleanup is necessary, it is around an >>>>>>> entire >>>>>>> composite PTransform. I think there have been discussions/proposals >>>>>>> around >>>>>>> a more methodical "cleanup" option, but those haven't been implemented, >>>>>>> to >>>>>>> the best of my knowledge. >>>>>>> >>>>>>> For instance, consider the steps of a FileIO: >>>>>>> 1. Write to a bunch (N shards) of temporary files >>>>>>> 2. When all temporary files are complete, attempt to do a bulk copy >>>>>>> to put them in the final destination. >>>>>>> 3. Cleanup all the temporary files. >>>>>>> >>>>>>> (This is often desirable because it minimizes the chance of seeing >>>>>>> partial/incomplete results in the final destination). >>>>>>> >>>>>>> In the above, you'd want step 1 to execute on many workers, likely >>>>>>> using a ParDo (say N different workers). >>>>>>> The move step should only happen once, so on one worker. This means >>>>>>> it will be a different DoFn, likely with some stuff done to ensure it >>>>>>> runs >>>>>>> on one worker. >>>>>>> >>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We >>>>>>> need an API for a PTransform to schedule some cleanup work for when the >>>>>>> transform is "done". In batch this is relatively straightforward, but >>>>>>> doesn't exist. This is the source of some problems, such as BigQuery >>>>>>> sink >>>>>>> leaving files around that have failed to import into BigQuery. >>>>>>> >>>>>>> In streaming this is less straightforward -- do you want to wait >>>>>>> until the end of the pipeline? Or do you want to wait until the end of >>>>>>> the >>>>>>> window? In practice, you just want to wait until you know nobody will >>>>>>> need >>>>>>> the resource anymore. >>>>>>> >>>>>>> This led to some discussions around a "cleanup" API, where you could >>>>>>> have a transform that output resource objects. Each resource object >>>>>>> would >>>>>>> have logic for cleaning it up. And there would be something that >>>>>>> indicated >>>>>>> what parts of the pipeline needed that resource, and what kind of >>>>>>> temporal >>>>>>> lifetime those objects had. As soon as that part of the pipeline had >>>>>>> advanced far enough that it would no longer need the resources, they >>>>>>> would >>>>>>> get cleaned up. This can be done at pipeline shutdown, or incrementally >>>>>>> during a streaming pipeline, etc. >>>>>>> >>>>>>> Would something like this be a better fit for your use case? If not, >>>>>>> why is handling teardown within a single DoFn sufficient? >>>>>>> >>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau < >>>>>>> rmannibu...@gmail.com> wrote: >>>>>>> >>>>>>>> Yes 1M. Lets try to explain you simplifying the overall execution. >>>>>>>> Each instance - one fn so likely in a thread of a worker - has its >>>>>>>> lifecycle. Caricaturally: "new" and garbage collection. >>>>>>>> >>>>>>>> In practise, new is often an unsafe allocate (deserialization) but >>>>>>>> it doesnt matter here. >>>>>>>> >>>>>>>> What i want is any "new" to have a following setup before any >>>>>>>> process or stattbundle and the last time beam has the instance before >>>>>>>> it is >>>>>>>> gc-ed and after last finishbundle it calls teardown. >>>>>>>> >>>>>>>> It is as simple as it. >>>>>>>> This way no need to comibe fn in a way making a fn not self >>>>>>>> contained to implement basic transforms. >>>>>>>> >>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a >>>>>>>>>> écrit : >>>>>>>>>> >>>>>>>>>> It feels like his thread may be a bit off-track. Rather than >>>>>>>>>> focusing on the semantics of the existing methods -- which have been >>>>>>>>>> noted >>>>>>>>>> to be meet many existing use cases -- it would be helpful to focus >>>>>>>>>> on more >>>>>>>>>> on the reason you are looking for something with different semantics. >>>>>>>>>> >>>>>>>>>> Some possibilities (I'm not sure which one you are trying to do): >>>>>>>>>> >>>>>>>>>> 1. Clean-up some external, global resource, that was initialized >>>>>>>>>> once during the startup of the pipeline. If this is the case, how >>>>>>>>>> are you >>>>>>>>>> ensuring it was really only initialized once (and not once per >>>>>>>>>> worker, per >>>>>>>>>> thread, per instance, etc.)? How do you know when the pipeline should >>>>>>>>>> release it? If the answer is "when it reaches step X", then what >>>>>>>>>> about a >>>>>>>>>> streaming pipeline? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> When the dofn is no more needed logically ie when the batch is >>>>>>>>>> done or stream is stopped (manually or by a jvm shutdown) >>>>>>>>>> >>>>>>>>> >>>>>>>>> I'm really not following what this means. >>>>>>>>> >>>>>>>>> Let's say that a pipeline is running 1000 workers, and each worker >>>>>>>>> is running 1000 threads (each running a copy of the same DoFn). How >>>>>>>>> many >>>>>>>>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when >>>>>>>>> do >>>>>>>>> you want it called? When the entire pipeline is shut down? When an >>>>>>>>> individual worker is about to shut down (which may be temporary - may >>>>>>>>> be >>>>>>>>> about to start back up)? Something else? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2. Finalize some resources that are used within some region of >>>>>>>>>> the pipeline. While, the DoFn lifecycle methods are not a good fit >>>>>>>>>> for this >>>>>>>>>> (they are focused on managing resources within the DoFn), you could >>>>>>>>>> model >>>>>>>>>> this on how FileIO finalizes the files that it produced. For >>>>>>>>>> instance: >>>>>>>>>> a) ParDo generates "resource IDs" (or some token that stores >>>>>>>>>> information about resources) >>>>>>>>>> b) "Require Deterministic Input" (to prevent retries from >>>>>>>>>> changing resource IDs) >>>>>>>>>> c) ParDo that initializes the resources >>>>>>>>>> d) Pipeline segments that use the resources, and eventually >>>>>>>>>> output the fact they're done >>>>>>>>>> e) "Require Deterministic Input" >>>>>>>>>> f) ParDo that frees the resources >>>>>>>>>> >>>>>>>>>> By making the use of the resource part of the data it is possible >>>>>>>>>> to "checkpoint" which resources may be in use or have been finished >>>>>>>>>> by >>>>>>>>>> using the require deterministic input. This is important to ensuring >>>>>>>>>> everything is actually cleaned up. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I nees that but generic and not case by case to industrialize >>>>>>>>>> some api on top of beam. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 3. Some other use case that I may be missing? If it is this case, >>>>>>>>>> could you elaborate on what you are trying to accomplish? That would >>>>>>>>>> help >>>>>>>>>> me understand both the problems with existing options and possibly >>>>>>>>>> what >>>>>>>>>> could be done to help. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I understand there are sorkaround for almost all cases but means >>>>>>>>>> each transform is different in its lifecycle handling except i >>>>>>>>>> dislike it >>>>>>>>>> a lot at a scale and as a user since you cant put any unified >>>>>>>>>> practise on >>>>>>>>>> top of beam, it also makes beam very hard to integrate or to use to >>>>>>>>>> build >>>>>>>>>> higher level libraries or softwares. >>>>>>>>>> >>>>>>>>>> This is why i tried to not start the workaround discussions and >>>>>>>>>> just stay at API level. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- Ben >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>> >>>>>>>>>>>> "Machine state" is overly low-level because many of the >>>>>>>>>>>> possible reasons can happen on a perfectly fine machine. >>>>>>>>>>>> If you'd like to rephrase it to "it will be called except in >>>>>>>>>>>> various situations where it's logically impossible or impractical >>>>>>>>>>>> to >>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some of >>>>>>>>>>>> the >>>>>>>>>>>> examples above. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Sounds ok to me >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> The main point for the user is, you *will* see non-preventable >>>>>>>>>>>> situations where it couldn't be called - it's not just >>>>>>>>>>>> intergalactic >>>>>>>>>>>> crashes - so if the logic is very important (e.g. cleaning up a >>>>>>>>>>>> large >>>>>>>>>>>> amount of temporary files, shutting down a large number of VMs you >>>>>>>>>>>> started >>>>>>>>>>>> etc), you have to express it using one of the other methods that >>>>>>>>>>>> have >>>>>>>>>>>> stricter guarantees (which obviously come at a cost, e.g. no >>>>>>>>>>>> pass-by-reference). >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which >>>>>>>>>>> which other method you speak about. Concretely if you make it really >>>>>>>>>>> unreliable - this is what best effort sounds to me - then users can >>>>>>>>>>> use it >>>>>>>>>>> to clean anything but if you make it "can happen but it is >>>>>>>>>>> unexpected and >>>>>>>>>>> means something happent" then it is fine to have a manual - or auto >>>>>>>>>>> if >>>>>>>>>>> fancy - recovery procedure. This is where it makes all the >>>>>>>>>>> difference and >>>>>>>>>>> impacts the developpers, ops (all users basically). >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is also >>>>>>>>>>>>> often used to say "at will" and this is what triggered this >>>>>>>>>>>>> thread. >>>>>>>>>>>>> >>>>>>>>>>>>> I'm fine using "except if the machine state prevents it" but >>>>>>>>>>>>> "best effort" is too open and can be very badly and wrongly >>>>>>>>>>>>> perceived by >>>>>>>>>>>>> users (like I did). >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>>> >>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> It will not be called if it's impossible to call it: in the >>>>>>>>>>>>>> example situation you have (intergalactic crash), and in a >>>>>>>>>>>>>> number of more >>>>>>>>>>>>>> common cases: eg in case the worker container has crashed (eg >>>>>>>>>>>>>> user code in >>>>>>>>>>>>>> a different thread called a C library over JNI and it >>>>>>>>>>>>>> segfaulted), JVM bug, >>>>>>>>>>>>>> crash due to user code OOM, in case the worker has lost network >>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to do >>>>>>>>>>>>>> anything >>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it was >>>>>>>>>>>>>> preempted >>>>>>>>>>>>>> by the underlying cluster manager without notice or if the >>>>>>>>>>>>>> worker was too >>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) >>>>>>>>>>>>>> until the >>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware >>>>>>>>>>>>>> simply failed >>>>>>>>>>>>>> (which happens quite often at scale), and in many other >>>>>>>>>>>>>> conditions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such >>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you >>>>>>>>>>>>>> observed a >>>>>>>>>>>>>> runner not call Teardown in a situation where it was possible to >>>>>>>>>>>>>> call it >>>>>>>>>>>>>> but the runner made insufficient effort. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> >>>>>>>>>>>>>>>>> a écrit : >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau < >>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm >>>>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the >>>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the >>>>>>>>>>>>>>>>>> following processing >>>>>>>>>>>>>>>>>> in between") I'll be better able to help you. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a >>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is not >>>>>>>>>>>>>>>>>> controlled. >>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the connection >>>>>>>>>>>>>>>>>> since it is a >>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes you >>>>>>>>>>>>>>>>>> pay a lot - aws >>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - >>>>>>>>>>>>>>>>>> concurrent limit. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things die >>>>>>>>>>>>>>>>> so badly that @Teardown is not called then nothing else can >>>>>>>>>>>>>>>>> be called to >>>>>>>>>>>>>>>>> close the connection either. What AWS service are you >>>>>>>>>>>>>>>>> thinking of that >>>>>>>>>>>>>>>>> stays open for a long time when everything at the other end >>>>>>>>>>>>>>>>> has died? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some >>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges which >>>>>>>>>>>>>>>>> are not only >>>>>>>>>>>>>>>>> "im leaving". >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> For aws i was thinking about starting some services - >>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing them >>>>>>>>>>>>>>>>> at the end. If >>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You can >>>>>>>>>>>>>>>>> say it can be >>>>>>>>>>>>>>>>> done another way...as the full pipeline ;). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its >>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic >>>>>>>>>>>>>>>>> pipelines and if >>>>>>>>>>>>>>>>> bound to some particular IO. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the >>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human >>>>>>>>>>>>>>>>> system? Nothing >>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to >>>>>>>>>>>>>>>>> some legacy >>>>>>>>>>>>>>>>> code on dataflow or something else? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this way >>>>>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're >>>>>>>>>>>>>>>> asking for. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call >>>>>>>>>>>>>>> then it is a bug and we are done :). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does >>>>>>>>>>>>>>>>> it so if a user udes the RI in test, he will get a different >>>>>>>>>>>>>>>>> behavior in >>>>>>>>>>>>>>>>> prod? Also dont forget the user doesnt know what the IOs he >>>>>>>>>>>>>>>>> composes use so >>>>>>>>>>>>>>>>> this is so impacting for the whole product than he must be >>>>>>>>>>>>>>>>> handled IMHO. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I understand the portability culture is new in big data >>>>>>>>>>>>>>>>> world but it is not a reason to ignore what people did for >>>>>>>>>>>>>>>>> years and do it >>>>>>>>>>>>>>>>> wrong before doing right ;). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - in >>>>>>>>>>>>>>>>> the normal IT conditions - the execution of teardown. Then we >>>>>>>>>>>>>>>>> see if we can >>>>>>>>>>>>>>>>> handle it and only if there is a technical reason we cant we >>>>>>>>>>>>>>>>> make it >>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink >>>>>>>>>>>>>>>>> can, any >>>>>>>>>>>>>>>>> unknown blocker for other runners? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Technical note: even a kill should go through java >>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing >>>>>>>>>>>>>>>>> software) is >>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. Only >>>>>>>>>>>>>>>>> case where it >>>>>>>>>>>>>>>>> is not true is when the software is always owned by a vendor >>>>>>>>>>>>>>>>> and never >>>>>>>>>>>>>>>>> installed on customer environment. In this case it belongd to >>>>>>>>>>>>>>>>> the vendor to >>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a >>>>>>>>>>>>>>>>> vendor - otherwise >>>>>>>>>>>>>>>>> all unsupported features by one runner should be made >>>>>>>>>>>>>>>>> optional right? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> All state is not about network, even in distributed >>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined >>>>>>>>>>>>>>>>> lifecycle. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>> >>> >> >