Nothing, as mentionned it is a bug so recovery is a bug recovery (procedure)
Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a écrit : > So what would you like to happen if there is a crash? The DoFn instance no > longer exists because the JVM it ran on no longer exists. What should > Teardown be called on? > > On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> This is what i want and not 999999 teardowns for 1000000 setups until >> there is an unexpected crash (= a bug). >> >> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit : >> >>> >>> >>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> >>>> >>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: >>>> >>>>> >>>>> >>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>> >>>>>> @Reuven: in practise it is created by pool of 256 but leads to the >>>>>> same pattern, the teardown is just a "if (iCreatedThem) releaseThem();" >>>>>> >>>>> >>>>> How do you control "256?" Even if you have a pool of 256 workers, >>>>> nothing in Beam guarantees how many threads and DoFns are created per >>>>> worker. In theory the runner might decide to create 1000 threads on each >>>>> worker. >>>>> >>>> >>>> Nop was the other way around, in this case on AWS you can get 256 >>>> instances at once but not 512 (which will be 2x256). So when you compute >>>> the distribution you allocate to some fn the role to own the instance >>>> lookup and releasing. >>>> >>> >>> I still don't understand. Let's be more precise. If you write the >>> following code: >>> >>> pCollection.apply(ParDo.of(new MyDoFn())); >>> >>> There is no way to control how many instances of MyDoFn are created. The >>> runner might decided to create a million instances of this class across >>> your worker pool, which means that you will get a million Setup and >>> Teardown calls. >>> >>> >>>> Anyway this was just an example of an external resource you must >>>> release. Real topic is that beam should define asap a guaranteed generic >>>> lifecycle to let user embrace its programming model. >>>> >>>> >>>>> >>>>> >>>>> >>>>>> @Eugene: >>>>>> 1. wait logic is about passing the value which is not always possible >>>>>> (like 15% of cases from my raw estimate) >>>>>> 2. sdf: i'll try to detail why i mention SDF more here >>>>>> >>>>>> >>>>>> Concretely beam exposes a portable API (included in the SDK core). >>>>>> This API defines a *container* API and therefore implies bean lifecycles. >>>>>> I'll not detail them all but just use the sources and dofn (not sdf) to >>>>>> illustrate the idea I'm trying to develop. >>>>>> >>>>>> A. Source >>>>>> >>>>>> A source computes a partition plan with 2 primitives: estimateSize >>>>>> and split. As an user you can expect both to be called on the same bean >>>>>> instance to avoid to pay the same connection cost(s) twice. Concretely: >>>>>> >>>>>> connect() >>>>>> try { >>>>>> estimateSize() >>>>>> split() >>>>>> } finally { >>>>>> disconnect() >>>>>> } >>>>>> >>>>>> this is not guaranteed by the API so you must do: >>>>>> >>>>>> connect() >>>>>> try { >>>>>> estimateSize() >>>>>> } finally { >>>>>> disconnect() >>>>>> } >>>>>> connect() >>>>>> try { >>>>>> split() >>>>>> } finally { >>>>>> disconnect() >>>>>> } >>>>>> >>>>>> + a workaround with an internal estimate size since this primitive is >>>>>> often called in split but you dont want to connect twice in the second >>>>>> phase. >>>>>> >>>>>> Why do you need that? Simply cause you want to define an API to >>>>>> implement sources which initializes the source bean and destroys it. >>>>>> I insists it is a very very basic concern for such API. However beam >>>>>> doesn't embraces it and doesn't assume it so building any API on top of >>>>>> beam is very hurtful today and for direct beam users you hit the exact >>>>>> same >>>>>> issues - check how IO are implemented, the static utilities which create >>>>>> volatile connections preventing to reuse existing connection in a single >>>>>> method (https://github.com/apache/beam/blob/master/sdks/java/io/ >>>>>> elasticsearch/src/main/java/org/apache/beam/sdk/io/ >>>>>> elasticsearch/ElasticsearchIO.java#L862). >>>>>> >>>>>> Same logic applies to the reader which is then created. >>>>>> >>>>>> B. DoFn & SDF >>>>>> >>>>>> As a fn dev you expect the same from the beam runtime: init(); try { >>>>>> while (...) process(); } finally { destroy(); } and that it is executed >>>>>> on >>>>>> the exact same instance to be able to be stateful at that level for >>>>>> expensive connections/operations/flow state handling. >>>>>> >>>>>> As you mentionned with the million example, this sequence should >>>>>> happen for each single instance so 1M times for your example. >>>>>> >>>>>> Now why did I mention SDF several times? Because SDF is a >>>>>> generalisation of both cases (source and dofn). Therefore it creates way >>>>>> more instances and requires to have a way more strict/explicit definition >>>>>> of the exact lifecycle and which instance does what. Since beam handles >>>>>> the >>>>>> full lifecycle of the bean instances it must provide init/destroy hooks >>>>>> (setup/teardown) which can be stateful. >>>>>> >>>>>> If you take the JDBC example which was mentionned earlier. Today, >>>>>> because of the teardown issue it uses bundles. Since bundles size is not >>>>>> defined - and will not with SDF, it must use a pool to be able to reuse a >>>>>> connection instance to not correct performances. Now with the SDF and the >>>>>> split increase, how do you handle the pool size? Generally in batch you >>>>>> use >>>>>> a single connection per thread to avoid to consume all database >>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. use a >>>>>> pool a bit higher but multiplied by the number of beans you will likely >>>>>> x2 >>>>>> or 3 the connection count and make the execution fail with "no more >>>>>> connection available". I you picked 1 (pool of #1), then you still have >>>>>> to >>>>>> have a reliable teardown by pool instance (close() generally) to ensure >>>>>> you >>>>>> release the pool and don't leak the connection information in the JVM. In >>>>>> all case you come back to the init()/destroy() lifecycle even if you fake >>>>>> to get connections with bundles. >>>>>> >>>>>> Just to make it obvious: SDF mentions are just cause SDF imply all >>>>>> the current issues with the loose definition of the bean lifecycles at an >>>>>> exponential level, nothing else. >>>>>> >>>>>> >>>>>> >>>>>> Romain Manni-Bucau >>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>> >>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>>>>> >>>>>>> The kind of whole-transform lifecycle you're mentioning can be >>>>>>> accomplished using the Wait transform as I suggested in the thread >>>>>>> above, >>>>>>> and I believe it should become the canonical way to do that. >>>>>>> >>>>>>> (Would like to reiterate one more time, as the main author of most >>>>>>> design documents related to SDF and of its implementation in the Java >>>>>>> direct and dataflow runner that SDF is fully unrelated to the topic of >>>>>>> cleanup - I'm very confused as to why it keeps coming up) >>>>>>> >>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau < >>>>>>> rmannibu...@gmail.com> wrote: >>>>>>> >>>>>>>> I kind of agree except transforms lack a lifecycle too. My >>>>>>>> understanding is that sdf could be a way to unify it and clean the api. >>>>>>>> >>>>>>>> Otherwise how to normalize - single api - lifecycle of transforms? >>>>>>>> >>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a >>>>>>>> écrit : >>>>>>>> >>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is >>>>>>>>> appropriate? Many cases where cleanup is necessary, it is around an >>>>>>>>> entire >>>>>>>>> composite PTransform. I think there have been discussions/proposals >>>>>>>>> around >>>>>>>>> a more methodical "cleanup" option, but those haven't been >>>>>>>>> implemented, to >>>>>>>>> the best of my knowledge. >>>>>>>>> >>>>>>>>> For instance, consider the steps of a FileIO: >>>>>>>>> 1. Write to a bunch (N shards) of temporary files >>>>>>>>> 2. When all temporary files are complete, attempt to do a bulk >>>>>>>>> copy to put them in the final destination. >>>>>>>>> 3. Cleanup all the temporary files. >>>>>>>>> >>>>>>>>> (This is often desirable because it minimizes the chance of seeing >>>>>>>>> partial/incomplete results in the final destination). >>>>>>>>> >>>>>>>>> In the above, you'd want step 1 to execute on many workers, likely >>>>>>>>> using a ParDo (say N different workers). >>>>>>>>> The move step should only happen once, so on one worker. This >>>>>>>>> means it will be a different DoFn, likely with some stuff done to >>>>>>>>> ensure it >>>>>>>>> runs on one worker. >>>>>>>>> >>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We >>>>>>>>> need an API for a PTransform to schedule some cleanup work for when >>>>>>>>> the >>>>>>>>> transform is "done". In batch this is relatively straightforward, but >>>>>>>>> doesn't exist. This is the source of some problems, such as BigQuery >>>>>>>>> sink >>>>>>>>> leaving files around that have failed to import into BigQuery. >>>>>>>>> >>>>>>>>> In streaming this is less straightforward -- do you want to wait >>>>>>>>> until the end of the pipeline? Or do you want to wait until the end >>>>>>>>> of the >>>>>>>>> window? In practice, you just want to wait until you know nobody will >>>>>>>>> need >>>>>>>>> the resource anymore. >>>>>>>>> >>>>>>>>> This led to some discussions around a "cleanup" API, where you >>>>>>>>> could have a transform that output resource objects. Each resource >>>>>>>>> object >>>>>>>>> would have logic for cleaning it up. And there would be something that >>>>>>>>> indicated what parts of the pipeline needed that resource, and what >>>>>>>>> kind of >>>>>>>>> temporal lifetime those objects had. As soon as that part of the >>>>>>>>> pipeline >>>>>>>>> had advanced far enough that it would no longer need the resources, >>>>>>>>> they >>>>>>>>> would get cleaned up. This can be done at pipeline shutdown, or >>>>>>>>> incrementally during a streaming pipeline, etc. >>>>>>>>> >>>>>>>>> Would something like this be a better fit for your use case? If >>>>>>>>> not, why is handling teardown within a single DoFn sufficient? >>>>>>>>> >>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau < >>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall >>>>>>>>>> execution. Each instance - one fn so likely in a thread of a worker >>>>>>>>>> - has >>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage collection. >>>>>>>>>> >>>>>>>>>> In practise, new is often an unsafe allocate (deserialization) >>>>>>>>>> but it doesnt matter here. >>>>>>>>>> >>>>>>>>>> What i want is any "new" to have a following setup before any >>>>>>>>>> process or stattbundle and the last time beam has the instance >>>>>>>>>> before it is >>>>>>>>>> gc-ed and after last finishbundle it calls teardown. >>>>>>>>>> >>>>>>>>>> It is as simple as it. >>>>>>>>>> This way no need to comibe fn in a way making a fn not self >>>>>>>>>> contained to implement basic transforms. >>>>>>>>>> >>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> >>>>>>>>>>>> a écrit : >>>>>>>>>>>> >>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather than >>>>>>>>>>>> focusing on the semantics of the existing methods -- which have >>>>>>>>>>>> been noted >>>>>>>>>>>> to be meet many existing use cases -- it would be helpful to focus >>>>>>>>>>>> on more >>>>>>>>>>>> on the reason you are looking for something with different >>>>>>>>>>>> semantics. >>>>>>>>>>>> >>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying to >>>>>>>>>>>> do): >>>>>>>>>>>> >>>>>>>>>>>> 1. Clean-up some external, global resource, that was >>>>>>>>>>>> initialized once during the startup of the pipeline. If this is >>>>>>>>>>>> the case, >>>>>>>>>>>> how are you ensuring it was really only initialized once (and not >>>>>>>>>>>> once per >>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you know when the >>>>>>>>>>>> pipeline >>>>>>>>>>>> should release it? If the answer is "when it reaches step X", then >>>>>>>>>>>> what >>>>>>>>>>>> about a streaming pipeline? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> When the dofn is no more needed logically ie when the batch is >>>>>>>>>>>> done or stream is stopped (manually or by a jvm shutdown) >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I'm really not following what this means. >>>>>>>>>>> >>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each >>>>>>>>>>> worker is running 1000 threads (each running a copy of the same >>>>>>>>>>> DoFn). How >>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M cleanups) >>>>>>>>>>> and when >>>>>>>>>>> do you want it called? When the entire pipeline is shut down? When >>>>>>>>>>> an >>>>>>>>>>> individual worker is about to shut down (which may be temporary - >>>>>>>>>>> may be >>>>>>>>>>> about to start back up)? Something else? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 2. Finalize some resources that are used within some region of >>>>>>>>>>>> the pipeline. While, the DoFn lifecycle methods are not a good fit >>>>>>>>>>>> for this >>>>>>>>>>>> (they are focused on managing resources within the DoFn), you >>>>>>>>>>>> could model >>>>>>>>>>>> this on how FileIO finalizes the files that it produced. For >>>>>>>>>>>> instance: >>>>>>>>>>>> a) ParDo generates "resource IDs" (or some token that stores >>>>>>>>>>>> information about resources) >>>>>>>>>>>> b) "Require Deterministic Input" (to prevent retries from >>>>>>>>>>>> changing resource IDs) >>>>>>>>>>>> c) ParDo that initializes the resources >>>>>>>>>>>> d) Pipeline segments that use the resources, and eventually >>>>>>>>>>>> output the fact they're done >>>>>>>>>>>> e) "Require Deterministic Input" >>>>>>>>>>>> f) ParDo that frees the resources >>>>>>>>>>>> >>>>>>>>>>>> By making the use of the resource part of the data it is >>>>>>>>>>>> possible to "checkpoint" which resources may be in use or have been >>>>>>>>>>>> finished by using the require deterministic input. This is >>>>>>>>>>>> important to >>>>>>>>>>>> ensuring everything is actually cleaned up. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I nees that but generic and not case by case to industrialize >>>>>>>>>>>> some api on top of beam. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 3. Some other use case that I may be missing? If it is this >>>>>>>>>>>> case, could you elaborate on what you are trying to accomplish? >>>>>>>>>>>> That would >>>>>>>>>>>> help me understand both the problems with existing options and >>>>>>>>>>>> possibly >>>>>>>>>>>> what could be done to help. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I understand there are sorkaround for almost all cases but >>>>>>>>>>>> means each transform is different in its lifecycle handling >>>>>>>>>>>> except i >>>>>>>>>>>> dislike it a lot at a scale and as a user since you cant put any >>>>>>>>>>>> unified >>>>>>>>>>>> practise on top of beam, it also makes beam very hard to integrate >>>>>>>>>>>> or to >>>>>>>>>>>> use to build higher level libraries or softwares. >>>>>>>>>>>> >>>>>>>>>>>> This is why i tried to not start the workaround discussions and >>>>>>>>>>>> just stay at API level. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- Ben >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> "Machine state" is overly low-level because many of the >>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine. >>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called except in >>>>>>>>>>>>>> various situations where it's logically impossible or >>>>>>>>>>>>>> impractical to >>>>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some >>>>>>>>>>>>>> of the >>>>>>>>>>>>>> examples above. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Sounds ok to me >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> The main point for the user is, you *will* see >>>>>>>>>>>>>> non-preventable situations where it couldn't be called - it's >>>>>>>>>>>>>> not just >>>>>>>>>>>>>> intergalactic crashes - so if the logic is very important (e.g. >>>>>>>>>>>>>> cleaning up >>>>>>>>>>>>>> a large amount of temporary files, shutting down a large number >>>>>>>>>>>>>> of VMs you >>>>>>>>>>>>>> started etc), you have to express it using one of the other >>>>>>>>>>>>>> methods that >>>>>>>>>>>>>> have stricter guarantees (which obviously come at a cost, e.g. no >>>>>>>>>>>>>> pass-by-reference). >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which >>>>>>>>>>>>> which other method you speak about. Concretely if you make it >>>>>>>>>>>>> really >>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then users >>>>>>>>>>>>> can use it >>>>>>>>>>>>> to clean anything but if you make it "can happen but it is >>>>>>>>>>>>> unexpected and >>>>>>>>>>>>> means something happent" then it is fine to have a manual - or >>>>>>>>>>>>> auto if >>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the >>>>>>>>>>>>> difference and >>>>>>>>>>>>> impacts the developpers, ops (all users basically). >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is >>>>>>>>>>>>>>> also often used to say "at will" and this is what triggered >>>>>>>>>>>>>>> this thread. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents it" but >>>>>>>>>>>>>>> "best effort" is too open and can be very badly and wrongly >>>>>>>>>>>>>>> perceived by >>>>>>>>>>>>>>> users (like I did). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: in the >>>>>>>>>>>>>>>> example situation you have (intergalactic crash), and in a >>>>>>>>>>>>>>>> number of more >>>>>>>>>>>>>>>> common cases: eg in case the worker container has crashed (eg >>>>>>>>>>>>>>>> user code in >>>>>>>>>>>>>>>> a different thread called a C library over JNI and it >>>>>>>>>>>>>>>> segfaulted), JVM bug, >>>>>>>>>>>>>>>> crash due to user code OOM, in case the worker has lost network >>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to do >>>>>>>>>>>>>>>> anything >>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it >>>>>>>>>>>>>>>> was preempted >>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if the >>>>>>>>>>>>>>>> worker was too >>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) >>>>>>>>>>>>>>>> until the >>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware >>>>>>>>>>>>>>>> simply failed >>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other >>>>>>>>>>>>>>>> conditions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such >>>>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you >>>>>>>>>>>>>>>> observed a >>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was possible >>>>>>>>>>>>>>>> to call it >>>>>>>>>>>>>>>> but the runner made insufficient effort. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" < >>>>>>>>>>>>>>>>>>> k...@google.com> a écrit : >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm >>>>>>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the >>>>>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the >>>>>>>>>>>>>>>>>>>> following processing >>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a >>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is >>>>>>>>>>>>>>>>>>>> not controlled. >>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the connection >>>>>>>>>>>>>>>>>>>> since it is a >>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes you >>>>>>>>>>>>>>>>>>>> pay a lot - aws >>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - >>>>>>>>>>>>>>>>>>>> concurrent limit. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things >>>>>>>>>>>>>>>>>>> die so badly that @Teardown is not called then nothing else >>>>>>>>>>>>>>>>>>> can be called >>>>>>>>>>>>>>>>>>> to close the connection either. What AWS service are you >>>>>>>>>>>>>>>>>>> thinking of that >>>>>>>>>>>>>>>>>>> stays open for a long time when everything at the other end >>>>>>>>>>>>>>>>>>> has died? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some >>>>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges >>>>>>>>>>>>>>>>>>> which are not only >>>>>>>>>>>>>>>>>>> "im leaving". >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services - >>>>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing >>>>>>>>>>>>>>>>>>> them at the end. If >>>>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You can >>>>>>>>>>>>>>>>>>> say it can be >>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its >>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic >>>>>>>>>>>>>>>>>>> pipelines and if >>>>>>>>>>>>>>>>>>> bound to some particular IO. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the >>>>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human >>>>>>>>>>>>>>>>>>> system? Nothing >>>>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to >>>>>>>>>>>>>>>>>>> some legacy >>>>>>>>>>>>>>>>>>> code on dataflow or something else? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this way >>>>>>>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're >>>>>>>>>>>>>>>>>> asking for. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call >>>>>>>>>>>>>>>>> then it is a bug and we are done :). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does >>>>>>>>>>>>>>>>>>> it so if a user udes the RI in test, he will get a >>>>>>>>>>>>>>>>>>> different behavior in >>>>>>>>>>>>>>>>>>> prod? Also dont forget the user doesnt know what the IOs he >>>>>>>>>>>>>>>>>>> composes use so >>>>>>>>>>>>>>>>>>> this is so impacting for the whole product than he must be >>>>>>>>>>>>>>>>>>> handled IMHO. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big data >>>>>>>>>>>>>>>>>>> world but it is not a reason to ignore what people did for >>>>>>>>>>>>>>>>>>> years and do it >>>>>>>>>>>>>>>>>>> wrong before doing right ;). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - >>>>>>>>>>>>>>>>>>> in the normal IT conditions - the execution of teardown. >>>>>>>>>>>>>>>>>>> Then we see if we >>>>>>>>>>>>>>>>>>> can handle it and only if there is a technical reason we >>>>>>>>>>>>>>>>>>> cant we make it >>>>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink >>>>>>>>>>>>>>>>>>> can, any >>>>>>>>>>>>>>>>>>> unknown blocker for other runners? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java >>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing >>>>>>>>>>>>>>>>>>> software) is >>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. >>>>>>>>>>>>>>>>>>> Only case where it >>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a >>>>>>>>>>>>>>>>>>> vendor and never >>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it belongd >>>>>>>>>>>>>>>>>>> to the vendor to >>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a >>>>>>>>>>>>>>>>>>> vendor - otherwise >>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made >>>>>>>>>>>>>>>>>>> optional right? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed >>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined >>>>>>>>>>>>>>>>>>> lifecycle. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>> >>>>> >>>> >>>