Nothing, as mentionned it is a bug so recovery is a bug recovery (procedure)

Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a écrit :

> So what would you like to happen if there is a crash? The DoFn instance no
> longer exists because the JVM it ran on no longer exists. What should
> Teardown be called on?
>
> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> This is what i want and not 999999 teardowns for 1000000 setups until
>> there is an unexpected crash (= a bug).
>>
>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit :
>>
>>>
>>>
>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> @Reuven: in practise it is created by pool of 256 but leads to the
>>>>>> same pattern, the teardown is just a "if (iCreatedThem) releaseThem();"
>>>>>>
>>>>>
>>>>> How do you control "256?" Even if you have a pool of 256 workers,
>>>>> nothing in Beam guarantees how many threads and DoFns are created per
>>>>> worker. In theory the runner might decide to create 1000 threads on each
>>>>> worker.
>>>>>
>>>>
>>>> Nop was the other way around, in this case on AWS you can get 256
>>>> instances at once but not 512 (which will be 2x256). So when you compute
>>>> the distribution you allocate to some fn the role to own the instance
>>>> lookup and releasing.
>>>>
>>>
>>> I still don't understand. Let's be more precise. If you write the
>>> following code:
>>>
>>>    pCollection.apply(ParDo.of(new MyDoFn()));
>>>
>>> There is no way to control how many instances of MyDoFn are created. The
>>> runner might decided to create a million instances of this class across
>>> your worker pool, which means that you will get a million Setup and
>>> Teardown calls.
>>>
>>>
>>>> Anyway this was just an example of an external resource you must
>>>> release. Real topic is that beam should define asap a guaranteed generic
>>>> lifecycle to let user embrace its programming model.
>>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>>>> @Eugene:
>>>>>> 1. wait logic is about passing the value which is not always possible
>>>>>> (like 15% of cases from my raw estimate)
>>>>>> 2. sdf: i'll try to detail why i mention SDF more here
>>>>>>
>>>>>>
>>>>>> Concretely beam exposes a portable API (included in the SDK core).
>>>>>> This API defines a *container* API and therefore implies bean lifecycles.
>>>>>> I'll not detail them all but just use the sources and dofn (not sdf) to
>>>>>> illustrate the idea I'm trying to develop.
>>>>>>
>>>>>> A. Source
>>>>>>
>>>>>> A source computes a partition plan with 2 primitives: estimateSize
>>>>>> and split. As an user you can expect both to be called on the same bean
>>>>>> instance to avoid to pay the same connection cost(s) twice. Concretely:
>>>>>>
>>>>>> connect()
>>>>>> try {
>>>>>>   estimateSize()
>>>>>>   split()
>>>>>> } finally {
>>>>>>   disconnect()
>>>>>> }
>>>>>>
>>>>>> this is not guaranteed by the API so you must do:
>>>>>>
>>>>>> connect()
>>>>>> try {
>>>>>>   estimateSize()
>>>>>> } finally {
>>>>>>   disconnect()
>>>>>> }
>>>>>> connect()
>>>>>> try {
>>>>>>   split()
>>>>>> } finally {
>>>>>>   disconnect()
>>>>>> }
>>>>>>
>>>>>> + a workaround with an internal estimate size since this primitive is
>>>>>> often called in split but you dont want to connect twice in the second
>>>>>> phase.
>>>>>>
>>>>>> Why do you need that? Simply cause you want to define an API to
>>>>>> implement sources which initializes the source bean and destroys it.
>>>>>> I insists it is a very very basic concern for such API. However beam
>>>>>> doesn't embraces it and doesn't assume it so building any API on top of
>>>>>> beam is very hurtful today and for direct beam users you hit the exact 
>>>>>> same
>>>>>> issues - check how IO are implemented, the static utilities which create
>>>>>> volatile connections preventing to reuse existing connection in a single
>>>>>> method (https://github.com/apache/beam/blob/master/sdks/java/io/
>>>>>> elasticsearch/src/main/java/org/apache/beam/sdk/io/
>>>>>> elasticsearch/ElasticsearchIO.java#L862).
>>>>>>
>>>>>> Same logic applies to the reader which is then created.
>>>>>>
>>>>>> B. DoFn & SDF
>>>>>>
>>>>>> As a fn dev you expect the same from the beam runtime: init(); try {
>>>>>> while (...) process(); } finally { destroy(); } and that it is executed 
>>>>>> on
>>>>>> the exact same instance to be able to be stateful at that level for
>>>>>> expensive connections/operations/flow state handling.
>>>>>>
>>>>>> As you mentionned with the million example, this sequence should
>>>>>> happen for each single instance so 1M times for your example.
>>>>>>
>>>>>> Now why did I mention SDF several times? Because SDF is a
>>>>>> generalisation of both cases (source and dofn). Therefore it creates way
>>>>>> more instances and requires to have a way more strict/explicit definition
>>>>>> of the exact lifecycle and which instance does what. Since beam handles 
>>>>>> the
>>>>>> full lifecycle of the bean instances it must provide init/destroy hooks
>>>>>> (setup/teardown) which can be stateful.
>>>>>>
>>>>>> If you take the JDBC example which was mentionned earlier. Today,
>>>>>> because of the teardown issue it uses bundles. Since bundles size is not
>>>>>> defined - and will not with SDF, it must use a pool to be able to reuse a
>>>>>> connection instance to not correct performances. Now with the SDF and the
>>>>>> split increase, how do you handle the pool size? Generally in batch you 
>>>>>> use
>>>>>> a single connection per thread to avoid to consume all database
>>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. use a
>>>>>> pool a bit higher but multiplied by the number of beans you will likely 
>>>>>> x2
>>>>>> or 3 the connection count and make the execution fail with "no more
>>>>>> connection available". I you picked 1 (pool of #1), then you still have 
>>>>>> to
>>>>>> have a reliable teardown by pool instance (close() generally) to ensure 
>>>>>> you
>>>>>> release the pool and don't leak the connection information in the JVM. In
>>>>>> all case you come back to the init()/destroy() lifecycle even if you fake
>>>>>> to get connections with bundles.
>>>>>>
>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply all
>>>>>> the current issues with the loose definition of the bean lifecycles at an
>>>>>> exponential level, nothing else.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Romain Manni-Bucau
>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>
>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>>
>>>>>>> The kind of whole-transform lifecycle you're mentioning can be
>>>>>>> accomplished using the Wait transform as I suggested in the thread 
>>>>>>> above,
>>>>>>> and I believe it should become the canonical way to do that.
>>>>>>>
>>>>>>> (Would like to reiterate one more time, as the main author of most
>>>>>>> design documents related to SDF and of its implementation in the Java
>>>>>>> direct and dataflow runner that SDF is fully unrelated to the topic of
>>>>>>> cleanup - I'm very confused as to why it keeps coming up)
>>>>>>>
>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <
>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I kind of agree except transforms lack a lifecycle too. My
>>>>>>>> understanding is that sdf could be a way to unify it and clean the api.
>>>>>>>>
>>>>>>>> Otherwise how to normalize - single api -  lifecycle of transforms?
>>>>>>>>
>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a
>>>>>>>> écrit :
>>>>>>>>
>>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is
>>>>>>>>> appropriate? Many cases where cleanup is necessary, it is around an 
>>>>>>>>> entire
>>>>>>>>> composite PTransform. I think there have been discussions/proposals 
>>>>>>>>> around
>>>>>>>>> a more methodical "cleanup" option, but those haven't been 
>>>>>>>>> implemented, to
>>>>>>>>> the best of my knowledge.
>>>>>>>>>
>>>>>>>>> For instance, consider the steps of a FileIO:
>>>>>>>>> 1. Write to a bunch (N shards) of temporary files
>>>>>>>>> 2. When all temporary files are complete, attempt to do a bulk
>>>>>>>>> copy to put them in the final destination.
>>>>>>>>> 3. Cleanup all the temporary files.
>>>>>>>>>
>>>>>>>>> (This is often desirable because it minimizes the chance of seeing
>>>>>>>>> partial/incomplete results in the final destination).
>>>>>>>>>
>>>>>>>>> In the above, you'd want step 1 to execute on many workers, likely
>>>>>>>>> using a ParDo (say N different workers).
>>>>>>>>> The move step should only happen once, so on one worker. This
>>>>>>>>> means it will be a different DoFn, likely with some stuff done to 
>>>>>>>>> ensure it
>>>>>>>>> runs on one worker.
>>>>>>>>>
>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We
>>>>>>>>> need an API for a PTransform to schedule some cleanup work for when 
>>>>>>>>> the
>>>>>>>>> transform is "done". In batch this is relatively straightforward, but
>>>>>>>>> doesn't exist. This is the source of some problems, such as BigQuery 
>>>>>>>>> sink
>>>>>>>>> leaving files around that have failed to import into BigQuery.
>>>>>>>>>
>>>>>>>>> In streaming this is less straightforward -- do you want to wait
>>>>>>>>> until the end of the pipeline? Or do you want to wait until the end 
>>>>>>>>> of the
>>>>>>>>> window? In practice, you just want to wait until you know nobody will 
>>>>>>>>> need
>>>>>>>>> the resource anymore.
>>>>>>>>>
>>>>>>>>> This led to some discussions around a "cleanup" API, where you
>>>>>>>>> could have a transform that output resource objects. Each resource 
>>>>>>>>> object
>>>>>>>>> would have logic for cleaning it up. And there would be something that
>>>>>>>>> indicated what parts of the pipeline needed that resource, and what 
>>>>>>>>> kind of
>>>>>>>>> temporal lifetime those objects had. As soon as that part of the 
>>>>>>>>> pipeline
>>>>>>>>> had advanced far enough that it would no longer need the resources, 
>>>>>>>>> they
>>>>>>>>> would get cleaned up. This can be done at pipeline shutdown, or
>>>>>>>>> incrementally during a streaming pipeline, etc.
>>>>>>>>>
>>>>>>>>> Would something like this be a better fit for your use case? If
>>>>>>>>> not, why is handling teardown within a single DoFn sufficient?
>>>>>>>>>
>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <
>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall
>>>>>>>>>> execution. Each instance - one fn so likely in a thread of a worker 
>>>>>>>>>> - has
>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage collection.
>>>>>>>>>>
>>>>>>>>>> In practise, new is often an unsafe allocate (deserialization)
>>>>>>>>>> but it doesnt matter here.
>>>>>>>>>>
>>>>>>>>>> What i want is any "new" to have a following setup before any
>>>>>>>>>> process or stattbundle and the last time beam has the instance 
>>>>>>>>>> before it is
>>>>>>>>>> gc-ed and after last finishbundle it calls teardown.
>>>>>>>>>>
>>>>>>>>>> It is as simple as it.
>>>>>>>>>> This way no need to comibe fn in a way making a fn not self
>>>>>>>>>> contained to implement basic transforms.
>>>>>>>>>>
>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org>
>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>
>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather than
>>>>>>>>>>>> focusing on the semantics of the existing methods -- which have 
>>>>>>>>>>>> been noted
>>>>>>>>>>>> to be meet many existing use cases -- it would be helpful to focus 
>>>>>>>>>>>> on more
>>>>>>>>>>>> on the reason you are looking for something with different 
>>>>>>>>>>>> semantics.
>>>>>>>>>>>>
>>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying to
>>>>>>>>>>>> do):
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Clean-up some external, global resource, that was
>>>>>>>>>>>> initialized once during the startup of the pipeline. If this is 
>>>>>>>>>>>> the case,
>>>>>>>>>>>> how are you ensuring it was really only initialized once (and not 
>>>>>>>>>>>> once per
>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you know when the 
>>>>>>>>>>>> pipeline
>>>>>>>>>>>> should release it? If the answer is "when it reaches step X", then 
>>>>>>>>>>>> what
>>>>>>>>>>>> about a streaming pipeline?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> When the dofn is no more needed logically ie when the batch is
>>>>>>>>>>>> done or stream is stopped (manually or by a jvm shutdown)
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'm really not following what this means.
>>>>>>>>>>>
>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each
>>>>>>>>>>> worker is running 1000 threads (each running a copy of the same 
>>>>>>>>>>> DoFn). How
>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M cleanups) 
>>>>>>>>>>> and when
>>>>>>>>>>> do you want it called? When the entire pipeline is shut down? When 
>>>>>>>>>>> an
>>>>>>>>>>> individual worker is about to shut down (which may be temporary - 
>>>>>>>>>>> may be
>>>>>>>>>>> about to start back up)? Something else?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Finalize some resources that are used within some region of
>>>>>>>>>>>> the pipeline. While, the DoFn lifecycle methods are not a good fit 
>>>>>>>>>>>> for this
>>>>>>>>>>>> (they are focused on managing resources within the DoFn), you 
>>>>>>>>>>>> could model
>>>>>>>>>>>> this on how FileIO finalizes the files that it produced. For 
>>>>>>>>>>>> instance:
>>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token that stores
>>>>>>>>>>>> information about resources)
>>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent retries from
>>>>>>>>>>>> changing resource IDs)
>>>>>>>>>>>>    c) ParDo that initializes the resources
>>>>>>>>>>>>    d) Pipeline segments that use the resources, and eventually
>>>>>>>>>>>> output the fact they're done
>>>>>>>>>>>>    e) "Require Deterministic Input"
>>>>>>>>>>>>    f) ParDo that frees the resources
>>>>>>>>>>>>
>>>>>>>>>>>> By making the use of the resource part of the data it is
>>>>>>>>>>>> possible to "checkpoint" which resources may be in use or have been
>>>>>>>>>>>> finished by using the require deterministic input. This is 
>>>>>>>>>>>> important to
>>>>>>>>>>>> ensuring everything is actually cleaned up.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I nees that but generic and not case by case to industrialize
>>>>>>>>>>>> some api on top of beam.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is this
>>>>>>>>>>>> case, could you elaborate on what you are trying to accomplish? 
>>>>>>>>>>>> That would
>>>>>>>>>>>> help me understand both the problems with existing options and 
>>>>>>>>>>>> possibly
>>>>>>>>>>>> what could be done to help.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I understand there are sorkaround for almost all cases but
>>>>>>>>>>>> means each transform is different in its lifecycle handling  
>>>>>>>>>>>> except i
>>>>>>>>>>>> dislike it a lot at a scale and as a user since you cant put any 
>>>>>>>>>>>> unified
>>>>>>>>>>>> practise on top of beam, it also makes beam very hard to integrate 
>>>>>>>>>>>> or to
>>>>>>>>>>>> use to build higher level libraries or softwares.
>>>>>>>>>>>>
>>>>>>>>>>>> This is why i tried to not start the workaround discussions and
>>>>>>>>>>>> just stay at API level.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -- Ben
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> "Machine state" is overly low-level because many of the
>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine.
>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called except in
>>>>>>>>>>>>>> various situations where it's logically impossible or 
>>>>>>>>>>>>>> impractical to
>>>>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some 
>>>>>>>>>>>>>> of the
>>>>>>>>>>>>>> examples above.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sounds ok to me
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The main point for the user is, you *will* see
>>>>>>>>>>>>>> non-preventable situations where it couldn't be called - it's 
>>>>>>>>>>>>>> not just
>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very important (e.g. 
>>>>>>>>>>>>>> cleaning up
>>>>>>>>>>>>>> a large amount of temporary files, shutting down a large number 
>>>>>>>>>>>>>> of VMs you
>>>>>>>>>>>>>> started etc), you have to express it using one of the other 
>>>>>>>>>>>>>> methods that
>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a cost, e.g. no
>>>>>>>>>>>>>> pass-by-reference).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which
>>>>>>>>>>>>> which other method you speak about. Concretely if you make it 
>>>>>>>>>>>>> really
>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then users 
>>>>>>>>>>>>> can use it
>>>>>>>>>>>>> to clean anything but if you make it "can happen but it is 
>>>>>>>>>>>>> unexpected and
>>>>>>>>>>>>> means something happent" then it is fine to have a manual - or 
>>>>>>>>>>>>> auto if
>>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the 
>>>>>>>>>>>>> difference and
>>>>>>>>>>>>> impacts the developpers, ops (all users basically).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is
>>>>>>>>>>>>>>> also often used to say "at will" and this is what triggered 
>>>>>>>>>>>>>>> this thread.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents it" but
>>>>>>>>>>>>>>> "best effort" is too open and can be very badly and wrongly 
>>>>>>>>>>>>>>> perceived by
>>>>>>>>>>>>>>> users (like I did).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: in the
>>>>>>>>>>>>>>>> example situation you have (intergalactic crash), and in a 
>>>>>>>>>>>>>>>> number of more
>>>>>>>>>>>>>>>> common cases: eg in case the worker container has crashed (eg 
>>>>>>>>>>>>>>>> user code in
>>>>>>>>>>>>>>>> a different thread called a C library over JNI and it 
>>>>>>>>>>>>>>>> segfaulted), JVM bug,
>>>>>>>>>>>>>>>> crash due to user code OOM, in case the worker has lost network
>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to do 
>>>>>>>>>>>>>>>> anything
>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it 
>>>>>>>>>>>>>>>> was preempted
>>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if the 
>>>>>>>>>>>>>>>> worker was too
>>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) 
>>>>>>>>>>>>>>>> until the
>>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware 
>>>>>>>>>>>>>>>> simply failed
>>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other 
>>>>>>>>>>>>>>>> conditions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such
>>>>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you 
>>>>>>>>>>>>>>>> observed a
>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was possible 
>>>>>>>>>>>>>>>> to call it
>>>>>>>>>>>>>>>> but the runner made insufficient effort.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <
>>>>>>>>>>>>>>>>>>> k...@google.com> a écrit :
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm
>>>>>>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the 
>>>>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the 
>>>>>>>>>>>>>>>>>>>> following processing
>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a
>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is 
>>>>>>>>>>>>>>>>>>>> not controlled.
>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the connection 
>>>>>>>>>>>>>>>>>>>> since it is a
>>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes you 
>>>>>>>>>>>>>>>>>>>> pay a lot - aws
>>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - 
>>>>>>>>>>>>>>>>>>>> concurrent limit.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things
>>>>>>>>>>>>>>>>>>> die so badly that @Teardown is not called then nothing else 
>>>>>>>>>>>>>>>>>>> can be called
>>>>>>>>>>>>>>>>>>> to close the connection either. What AWS service are you 
>>>>>>>>>>>>>>>>>>> thinking of that
>>>>>>>>>>>>>>>>>>> stays open for a long time when everything at the other end 
>>>>>>>>>>>>>>>>>>> has died?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some
>>>>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges 
>>>>>>>>>>>>>>>>>>> which are not only
>>>>>>>>>>>>>>>>>>> "im leaving".
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services -
>>>>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing 
>>>>>>>>>>>>>>>>>>> them at the end. If
>>>>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You can 
>>>>>>>>>>>>>>>>>>> say it can be
>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its
>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic 
>>>>>>>>>>>>>>>>>>> pipelines and if
>>>>>>>>>>>>>>>>>>> bound to some particular IO.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the
>>>>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human 
>>>>>>>>>>>>>>>>>>> system? Nothing
>>>>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to 
>>>>>>>>>>>>>>>>>>> some legacy
>>>>>>>>>>>>>>>>>>> code on dataflow or something else?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this way
>>>>>>>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're 
>>>>>>>>>>>>>>>>>> asking for.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call
>>>>>>>>>>>>>>>>> then it is a bug and we are done :).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does
>>>>>>>>>>>>>>>>>>> it so if a user udes the RI in test, he will get a 
>>>>>>>>>>>>>>>>>>> different behavior in
>>>>>>>>>>>>>>>>>>> prod? Also dont forget the user doesnt know what the IOs he 
>>>>>>>>>>>>>>>>>>> composes use so
>>>>>>>>>>>>>>>>>>> this is so impacting for the whole product than he must be 
>>>>>>>>>>>>>>>>>>> handled IMHO.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big data
>>>>>>>>>>>>>>>>>>> world but it is not a reason to ignore what people did for 
>>>>>>>>>>>>>>>>>>> years and do it
>>>>>>>>>>>>>>>>>>> wrong before doing right ;).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee -
>>>>>>>>>>>>>>>>>>> in the normal IT conditions - the execution of teardown. 
>>>>>>>>>>>>>>>>>>> Then we see if we
>>>>>>>>>>>>>>>>>>> can handle it and only if there is a technical reason we 
>>>>>>>>>>>>>>>>>>> cant we make it
>>>>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink 
>>>>>>>>>>>>>>>>>>> can, any
>>>>>>>>>>>>>>>>>>> unknown blocker for other runners?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java
>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing 
>>>>>>>>>>>>>>>>>>> software) is
>>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. 
>>>>>>>>>>>>>>>>>>> Only case where it
>>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a 
>>>>>>>>>>>>>>>>>>> vendor and never
>>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it belongd 
>>>>>>>>>>>>>>>>>>> to the vendor to
>>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a 
>>>>>>>>>>>>>>>>>>> vendor - otherwise
>>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made 
>>>>>>>>>>>>>>>>>>> optional right?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed
>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined 
>>>>>>>>>>>>>>>>>>> lifecycle.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>

Reply via email to