Agree let's try another time:
any issue removing "best effort"? if yes, any issue explaining it is due to failure and not a runner choice? if one of both is fine then we close this thread and just decide who fixes it, if not we must define and discuss why there is a teardown now and how to implement the need in beam. Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance> 2018-02-19 23:35 GMT+01:00 Ismaël Mejía <ieme...@gmail.com>: > I also had a different understanding of the lifecycle of a DoFn. > > My understanding of the use case for every method in the DoFn was clear and > perfectly aligned with Thomas explanation, but what I understood was that > in a > general terms ‘@Setup was where I got resources/prepare connections and > @Teardown where I free them’, so calling Teardown seemed essential to have > a > complete lifecycle: > Setup → StartBundle* → ProcessElement* → FinishBundle* → Teardown > > The fact that @Teardown could not be called is a new detail for me too, > and I > also find weird to have a method that may or not be called as part of an > API, > why would users implement teardown if it will not be called? In that case > probably a cleaner approach would be to get rid of that method altogether, > no? > > But well maybe that’s not so easy too, there was another point: Some user > reported an issue with leaking resources using KafkaIO in the Spark > runner, for > ref. > https://apachebeam.slack.com/archives/C1AAFJYMP/p1510596938000622 > > In that moment my understanding was that there was something fishy because > we > should be calling Teardown to close correctly the connections and free the > resources in case of exceptions on start/process/finish, so I filled a > JIRA and > fixed this by enforcing the call of teardown for the Spark runner and the > Flink > runner: > https://issues.apache.org/jira/browse/BEAM-3187 > https://issues.apache.org/jira/browse/BEAM-3244 > > As you can see not calling this method does have consequences at least for > non-containerized runners. Of course a runner that uses containers could > not > care about cleaning the resources this way, but a long living JVM in a > Hadoop > environment probably won’t have the same luck. So I am not sure that > having a > loose semantic there is the right option, I mean, runners could simply > guarantee > that they call teardown and if teardown takes too long they can decide to > send a > signal or kill the process/container/etc and go ahead, that way at least > users > would have a motivation to implement the teardown method, otherwise it > doesn’t > make any sense to have it (API wise). > > On Mon, Feb 19, 2018 at 11:30 PM, Eugene Kirpichov <kirpic...@google.com> > wrote: > > Romain, would it be fair to say that currently the goal of your > > participation in this discussion is to identify situations where > @Teardown > > in principle could have been called, but some of the current runners > don't > > make a good enough effort to call it? If yes - as I said before, please, > by > > all means, file bugs of the form "Runner X doesn't call @Teardown in > > situation Y" if you're aware of any, and feel free to send PRs fixing > runner > > X to reliably call @Teardown in situation Y. I think we all agree that > this > > would be a good improvement. > > > > On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau < > rmannibu...@gmail.com> > > wrote: > >> > >> > >> > >> Le 19 févr. 2018 22:56, "Reuven Lax" <re...@google.com> a écrit : > >> > >> > >> > >> On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau > >> <rmannibu...@gmail.com> wrote: > >>> > >>> > >>> > >>> Le 19 févr. 2018 21:28, "Reuven Lax" <re...@google.com> a écrit : > >>> > >>> How do you call teardown? There are cases in which the Java code gets > no > >>> indication that the restart is happening (e.g. cases where the machine > >>> itself is taken down) > >>> > >>> > >>> This is a bug, 0 downtime maintenance is very doable in 2018 ;). > Crashes > >>> are bugs, kill -9 to shutdown is a bug too. Other cases let call > shutdown > >>> with a hook worse case. > >> > >> > >> What you say here is simply not true. > >> > >> There are many scenarios in which workers shutdown with no opportunity > for > >> any sort of shutdown hook. Sometimes the entire machine gets shutdown, > and > >> not even the OS will have much of a chance to do anything. At scale this > >> will happen with some regularity, and a distributed system that assumes > this > >> will not happen is a poor distributed system. > >> > >> > >> This is part of the infra and there is no reason the machine is shutdown > >> without shutting down what runs on it before except if it is a bug in > the > >> software or setup. I can hear you maybe dont do it everywhere but there > is > >> no blocker to do it. Means you can shutdown the machines and guarantee > >> teardown is called. > >> > >> Where i go is simply that it is doable and beam sdk core can assume > setup > >> is well done. If there is a best effort downside due to that - with the > >> meaning you defined - it is an impl bug or a user installation issue. > >> > >> Technically all is true. > >> > >> What can prevent teardown is a hardware failure or so. This is fine and > >> doesnt need to be in doc since it is life in IT and obvious or must be > very > >> explicit to avoid current ambiguity. > >> > >> > >>> > >>> > >>> > >>> > >>> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau < > rmannibu...@gmail.com> > >>> wrote: > >>>> > >>>> Restarting doesnt mean you dont call teardown. Except a bug there is > no > >>>> reason - technically - it happens, no reason. > >>>> > >>>> Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit : > >>>>> > >>>>> Workers restarting is not a bug, it's standard often expected. > >>>>> > >>>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau > >>>>> <rmannibu...@gmail.com> wrote: > >>>>>> > >>>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery > >>>>>> (procedure) > >>>>>> > >>>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a > >>>>>> écrit : > >>>>>>> > >>>>>>> So what would you like to happen if there is a crash? The DoFn > >>>>>>> instance no longer exists because the JVM it ran on no longer > exists. What > >>>>>>> should Teardown be called on? > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau > >>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>> > >>>>>>>> This is what i want and not 999999 teardowns for 1000000 setups > >>>>>>>> until there is an unexpected crash (= a bug). > >>>>>>>> > >>>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit : > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau > >>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau > >>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but leads to > >>>>>>>>>>>> the same pattern, the teardown is just a "if (iCreatedThem) > releaseThem();" > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> How do you control "256?" Even if you have a pool of 256 > workers, > >>>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are > created per > >>>>>>>>>>> worker. In theory the runner might decide to create 1000 > threads on each > >>>>>>>>>>> worker. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Nop was the other way around, in this case on AWS you can get > 256 > >>>>>>>>>> instances at once but not 512 (which will be 2x256). So when > you compute the > >>>>>>>>>> distribution you allocate to some fn the role to own the > instance lookup and > >>>>>>>>>> releasing. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I still don't understand. Let's be more precise. If you write the > >>>>>>>>> following code: > >>>>>>>>> > >>>>>>>>> pCollection.apply(ParDo.of(new MyDoFn())); > >>>>>>>>> > >>>>>>>>> There is no way to control how many instances of MyDoFn are > >>>>>>>>> created. The runner might decided to create a million instances > of this > >>>>>>>>> class across your worker pool, which means that you will get a > million Setup > >>>>>>>>> and Teardown calls. > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Anyway this was just an example of an external resource you must > >>>>>>>>>> release. Real topic is that beam should define asap a > guaranteed generic > >>>>>>>>>> lifecycle to let user embrace its programming model. > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> @Eugene: > >>>>>>>>>>>> 1. wait logic is about passing the value which is not always > >>>>>>>>>>>> possible (like 15% of cases from my raw estimate) > >>>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Concretely beam exposes a portable API (included in the SDK > >>>>>>>>>>>> core). This API defines a *container* API and therefore > implies bean > >>>>>>>>>>>> lifecycles. I'll not detail them all but just use the sources > and dofn (not > >>>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop. > >>>>>>>>>>>> > >>>>>>>>>>>> A. Source > >>>>>>>>>>>> > >>>>>>>>>>>> A source computes a partition plan with 2 primitives: > >>>>>>>>>>>> estimateSize and split. As an user you can expect both to be > called on the > >>>>>>>>>>>> same bean instance to avoid to pay the same connection > cost(s) twice. > >>>>>>>>>>>> Concretely: > >>>>>>>>>>>> > >>>>>>>>>>>> connect() > >>>>>>>>>>>> try { > >>>>>>>>>>>> estimateSize() > >>>>>>>>>>>> split() > >>>>>>>>>>>> } finally { > >>>>>>>>>>>> disconnect() > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> this is not guaranteed by the API so you must do: > >>>>>>>>>>>> > >>>>>>>>>>>> connect() > >>>>>>>>>>>> try { > >>>>>>>>>>>> estimateSize() > >>>>>>>>>>>> } finally { > >>>>>>>>>>>> disconnect() > >>>>>>>>>>>> } > >>>>>>>>>>>> connect() > >>>>>>>>>>>> try { > >>>>>>>>>>>> split() > >>>>>>>>>>>> } finally { > >>>>>>>>>>>> disconnect() > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> + a workaround with an internal estimate size since this > >>>>>>>>>>>> primitive is often called in split but you dont want to > connect twice in the > >>>>>>>>>>>> second phase. > >>>>>>>>>>>> > >>>>>>>>>>>> Why do you need that? Simply cause you want to define an API > to > >>>>>>>>>>>> implement sources which initializes the source bean and > destroys it. > >>>>>>>>>>>> I insists it is a very very basic concern for such API. > However > >>>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building > any API on top of > >>>>>>>>>>>> beam is very hurtful today and for direct beam users you hit > the exact same > >>>>>>>>>>>> issues - check how IO are implemented, the static utilities > which create > >>>>>>>>>>>> volatile connections preventing to reuse existing connection > in a single > >>>>>>>>>>>> method > >>>>>>>>>>>> (https://github.com/apache/beam/blob/master/sdks/java/io/ > elasticsearch/src/main/java/org/apache/beam/sdk/io/ > elasticsearch/ElasticsearchIO.java#L862). > >>>>>>>>>>>> > >>>>>>>>>>>> Same logic applies to the reader which is then created. > >>>>>>>>>>>> > >>>>>>>>>>>> B. DoFn & SDF > >>>>>>>>>>>> > >>>>>>>>>>>> As a fn dev you expect the same from the beam runtime: init(); > >>>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and > that it is > >>>>>>>>>>>> executed on the exact same instance to be able to be stateful > at that level > >>>>>>>>>>>> for expensive connections/operations/flow state handling. > >>>>>>>>>>>> > >>>>>>>>>>>> As you mentionned with the million example, this sequence > should > >>>>>>>>>>>> happen for each single instance so 1M times for your example. > >>>>>>>>>>>> > >>>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a > >>>>>>>>>>>> generalisation of both cases (source and dofn). Therefore it > creates way > >>>>>>>>>>>> more instances and requires to have a way more > strict/explicit definition of > >>>>>>>>>>>> the exact lifecycle and which instance does what. Since beam > handles the > >>>>>>>>>>>> full lifecycle of the bean instances it must provide > init/destroy hooks > >>>>>>>>>>>> (setup/teardown) which can be stateful. > >>>>>>>>>>>> > >>>>>>>>>>>> If you take the JDBC example which was mentionned earlier. > >>>>>>>>>>>> Today, because of the teardown issue it uses bundles. Since > bundles size is > >>>>>>>>>>>> not defined - and will not with SDF, it must use a pool to be > able to reuse > >>>>>>>>>>>> a connection instance to not correct performances. Now with > the SDF and the > >>>>>>>>>>>> split increase, how do you handle the pool size? Generally in > batch you use > >>>>>>>>>>>> a single connection per thread to avoid to consume all > database connections. > >>>>>>>>>>>> With a pool you have 2 choices: 1. use a pool of 1, 2. use a > pool a bit > >>>>>>>>>>>> higher but multiplied by the number of beans you will likely > x2 or 3 the > >>>>>>>>>>>> connection count and make the execution fail with "no more > connection > >>>>>>>>>>>> available". I you picked 1 (pool of #1), then you still have > to have a > >>>>>>>>>>>> reliable teardown by pool instance (close() generally) to > ensure you release > >>>>>>>>>>>> the pool and don't leak the connection information in the > JVM. In all case > >>>>>>>>>>>> you come back to the init()/destroy() lifecycle even if you > fake to get > >>>>>>>>>>>> connections with bundles. > >>>>>>>>>>>> > >>>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply > >>>>>>>>>>>> all the current issues with the loose definition of the bean > lifecycles at > >>>>>>>>>>>> an exponential level, nothing else. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Romain Manni-Bucau > >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn | Book > >>>>>>>>>>>> > >>>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov > >>>>>>>>>>>> <kirpic...@google.com>: > >>>>>>>>>>>>> > >>>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning can > be > >>>>>>>>>>>>> accomplished using the Wait transform as I suggested in the > thread above, > >>>>>>>>>>>>> and I believe it should become the canonical way to do that. > >>>>>>>>>>>>> > >>>>>>>>>>>>> (Would like to reiterate one more time, as the main author of > >>>>>>>>>>>>> most design documents related to SDF and of its > implementation in the Java > >>>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to > the topic of > >>>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up) > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau > >>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My > >>>>>>>>>>>>>> understanding is that sdf could be a way to unify it and > clean the api. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Otherwise how to normalize - single api - lifecycle of > >>>>>>>>>>>>>> transforms? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" < > bchamb...@apache.org> > >>>>>>>>>>>>>> a écrit : > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific > DoFn's > >>>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary, it > is around an > >>>>>>>>>>>>>>> entire composite PTransform. I think there have been > discussions/proposals > >>>>>>>>>>>>>>> around a more methodical "cleanup" option, but those > haven't been > >>>>>>>>>>>>>>> implemented, to the best of my knowledge. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> For instance, consider the steps of a FileIO: > >>>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files > >>>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do a > >>>>>>>>>>>>>>> bulk copy to put them in the final destination. > >>>>>>>>>>>>>>> 3. Cleanup all the temporary files. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> (This is often desirable because it minimizes the chance of > >>>>>>>>>>>>>>> seeing partial/incomplete results in the final > destination). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many workers, > >>>>>>>>>>>>>>> likely using a ParDo (say N different workers). > >>>>>>>>>>>>>>> The move step should only happen once, so on one worker. > This > >>>>>>>>>>>>>>> means it will be a different DoFn, likely with some stuff > done to ensure it > >>>>>>>>>>>>>>> runs on one worker. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not > >>>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule some > cleanup work for > >>>>>>>>>>>>>>> when the transform is "done". In batch this is relatively > straightforward, > >>>>>>>>>>>>>>> but doesn't exist. This is the source of some problems, > such as BigQuery > >>>>>>>>>>>>>>> sink leaving files around that have failed to import into > BigQuery. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In streaming this is less straightforward -- do you want to > >>>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to wait > until the end of > >>>>>>>>>>>>>>> the window? In practice, you just want to wait until you > know nobody will > >>>>>>>>>>>>>>> need the resource anymore. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> This led to some discussions around a "cleanup" API, where > >>>>>>>>>>>>>>> you could have a transform that output resource objects. > Each resource > >>>>>>>>>>>>>>> object would have logic for cleaning it up. And there > would be something > >>>>>>>>>>>>>>> that indicated what parts of the pipeline needed that > resource, and what > >>>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as > that part of the > >>>>>>>>>>>>>>> pipeline had advanced far enough that it would no longer > need the resources, > >>>>>>>>>>>>>>> they would get cleaned up. This can be done at pipeline > shutdown, or > >>>>>>>>>>>>>>> incrementally during a streaming pipeline, etc. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Would something like this be a better fit for your use > case? > >>>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn > sufficient? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau > >>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall > >>>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a thread > of a worker - has > >>>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage > collection. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> In practise, new is often an unsafe allocate > >>>>>>>>>>>>>>>> (deserialization) but it doesnt matter here. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> What i want is any "new" to have a following setup before > >>>>>>>>>>>>>>>> any process or stattbundle and the last time beam has the > instance before it > >>>>>>>>>>>>>>>> is gc-ed and after last finishbundle it calls teardown. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It is as simple as it. > >>>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not > self > >>>>>>>>>>>>>>>> contained to implement basic transforms. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a > >>>>>>>>>>>>>>>> écrit : > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau > >>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" > >>>>>>>>>>>>>>>>>> <bchamb...@apache.org> a écrit : > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather > >>>>>>>>>>>>>>>>>> than focusing on the semantics of the existing methods > -- which have been > >>>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would be > helpful to focus on > >>>>>>>>>>>>>>>>>> more on the reason you are looking for something with > different semantics. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are > trying > >>>>>>>>>>>>>>>>>> to do): > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was > >>>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline. If > this is the case, > >>>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized > once (and not once per > >>>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you > know when the pipeline > >>>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches > step X", then what > >>>>>>>>>>>>>>>>>> about a streaming pipeline? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when the > >>>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a > jvm shutdown) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I'm really not following what this means. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and > each > >>>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy of > the same DoFn). How > >>>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M > cleanups) and when > >>>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is shut > down? When an > >>>>>>>>>>>>>>>>> individual worker is about to shut down (which may be > temporary - may be > >>>>>>>>>>>>>>>>> about to start back up)? Something else? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some > >>>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle > methods are not a good fit > >>>>>>>>>>>>>>>>>> for this (they are focused on managing resources within > the DoFn), you could > >>>>>>>>>>>>>>>>>> model this on how FileIO finalizes the files that it > produced. For instance: > >>>>>>>>>>>>>>>>>> a) ParDo generates "resource IDs" (or some token that > >>>>>>>>>>>>>>>>>> stores information about resources) > >>>>>>>>>>>>>>>>>> b) "Require Deterministic Input" (to prevent retries > >>>>>>>>>>>>>>>>>> from changing resource IDs) > >>>>>>>>>>>>>>>>>> c) ParDo that initializes the resources > >>>>>>>>>>>>>>>>>> d) Pipeline segments that use the resources, and > >>>>>>>>>>>>>>>>>> eventually output the fact they're done > >>>>>>>>>>>>>>>>>> e) "Require Deterministic Input" > >>>>>>>>>>>>>>>>>> f) ParDo that frees the resources > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> By making the use of the resource part of the data it is > >>>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use > or have been finished > >>>>>>>>>>>>>>>>>> by using the require deterministic input. This is > important to ensuring > >>>>>>>>>>>>>>>>>> everything is actually cleaned up. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I nees that but generic and not case by case to > >>>>>>>>>>>>>>>>>> industrialize some api on top of beam. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is > >>>>>>>>>>>>>>>>>> this case, could you elaborate on what you are trying > to accomplish? That > >>>>>>>>>>>>>>>>>> would help me understand both the problems with > existing options and > >>>>>>>>>>>>>>>>>> possibly what could be done to help. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all cases > but > >>>>>>>>>>>>>>>>>> means each transform is different in its lifecycle > handling except i > >>>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you > cant put any unified > >>>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard > to integrate or to use > >>>>>>>>>>>>>>>>>> to build higher level libraries or softwares. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> This is why i tried to not start the workaround > >>>>>>>>>>>>>>>>>> discussions and just stay at API level. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -- Ben > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau > >>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov > >>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of > the > >>>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine > machine. > >>>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called > >>>>>>>>>>>>>>>>>>>> except in various situations where it's logically > impossible or impractical > >>>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or you > can list some of the > >>>>>>>>>>>>>>>>>>>> examples above. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Sounds ok to me > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see > >>>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be > called - it's not just > >>>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very > important (e.g. cleaning up > >>>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a > large number of VMs you > >>>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of the > other methods that > >>>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a > cost, e.g. no > >>>>>>>>>>>>>>>>>>>> pass-by-reference). > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not > >>>>>>>>>>>>>>>>>>> which which other method you speak about. Concretely > if you make it really > >>>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - > then users can use it > >>>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen but > it is unexpected and > >>>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a > manual - or auto if fancy > >>>>>>>>>>>>>>>>>>> - recovery procedure. This is where it makes all the > difference and impacts > >>>>>>>>>>>>>>>>>>> the developpers, ops (all users basically). > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau > >>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It > >>>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is what > triggered this thread. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents > >>>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very > badly and wrongly > >>>>>>>>>>>>>>>>>>>>> perceived by users (like I did). > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau > >>>>>>>>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn | > >>>>>>>>>>>>>>>>>>>>> Book > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov > >>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: > >>>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic > crash), and in a number of > >>>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container > has crashed (eg user code > >>>>>>>>>>>>>>>>>>>>>> in a different thread called a C library over JNI > and it segfaulted), JVM > >>>>>>>>>>>>>>>>>>>>>> bug, crash due to user code OOM, in case the worker > has lost network > >>>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be > able to do anything > >>>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible > VM and it was preempted by > >>>>>>>>>>>>>>>>>>>>>> the underlying cluster manager without notice or if > the worker was too busy > >>>>>>>>>>>>>>>>>>>>>> with other stuff (eg calling other Teardown > functions) until the preemption > >>>>>>>>>>>>>>>>>>>>>> timeout elapsed, in case the underlying hardware > simply failed (which > >>>>>>>>>>>>>>>>>>>>>> happens quite often at scale), and in many other > conditions. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe > >>>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for > cases where you observed a > >>>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it > was possible to call it but > >>>>>>>>>>>>>>>>>>>>>> the runner made insufficient effort. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau > >>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov > >>>>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau > >>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" > >>>>>>>>>>>>>>>>>>>>>>>>> <k...@google.com> a écrit : > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain > Manni-Bucau > >>>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need > (e.g. > >>>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and it > requires the following > >>>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic > and the following processing > >>>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a > >>>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer > since size is not controlled. > >>>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the > connection since it is a best > >>>>>>>>>>>>>>>>>>>>>>>>>> effort thing. Not releasing the connection > makes you pay a lot - aws ;) - or > >>>>>>>>>>>>>>>>>>>>>>>>>> prevents you to launch other processings - > concurrent limit. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If > >>>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not called > then nothing else can be > >>>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What AWS > service are you thinking of > >>>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when everything > at the other end has died? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but > >>>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some > closing exchanges which are not > >>>>>>>>>>>>>>>>>>>>>>>>> only "im leaving". > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some > services > >>>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup > and closing them at the end. > >>>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines and > money. You can say it can be > >>>>>>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;). > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle > its > >>>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for > generic pipelines and if > >>>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring > >>>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be > handled by any human system? > >>>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not > handle it? Is it due to some > >>>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else? > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented > >>>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what kind > of change you're asking > >>>>>>>>>>>>>>>>>>>>>>>> for. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not > >>>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :). > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct > runner > >>>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he > will get a different behavior > >>>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know > what the IOs he composes use > >>>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product > than he must be handled IMHO. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in > big > >>>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore what > people did for years and do > >>>>>>>>>>>>>>>>>>>>>>>>> it wrong before doing right ;). > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to > >>>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the > execution of teardown. Then we > >>>>>>>>>>>>>>>>>>>>>>>>> see if we can handle it and only if there is a > technical reason we cant we > >>>>>>>>>>>>>>>>>>>>>>>>> make it experimental/unsupported in the api. I > know spark and flink can, any > >>>>>>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through > java > >>>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam > enclosing software) is fully > >>>>>>>>>>>>>>>>>>>>>>>>> unhandled and your overall system is > uncontrolled. Only case where it is not > >>>>>>>>>>>>>>>>>>>>>>>>> true is when the software is always owned by a > vendor and never installed on > >>>>>>>>>>>>>>>>>>>>>>>>> customer environment. In this case it belongd to > the vendor to handle beam > >>>>>>>>>>>>>>>>>>>>>>>>> API and not to beam to adjust its API for a > vendor - otherwise all > >>>>>>>>>>>>>>>>>>>>>>>>> unsupported features by one runner should be > made optional right? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in > distributed > >>>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and > defined lifecycle. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Kenn > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>> > >> > >> > > >