Agree

let's try another time:


any issue removing "best effort"?
if yes, any issue explaining it is due to failure and not a runner choice?

if one of both is fine then we close this thread and just decide who fixes
it, if not we must define and discuss why there is a teardown now and how
to implement the need in beam.



Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-02-19 23:35 GMT+01:00 Ismaël Mejía <ieme...@gmail.com>:

> I also had a different understanding of the lifecycle of a DoFn.
>
> My understanding of the use case for every method in the DoFn was clear and
> perfectly aligned with Thomas explanation, but what I understood was that
> in a
> general terms ‘@Setup was where I got resources/prepare connections and
> @Teardown where I free them’, so calling Teardown seemed essential to have
> a
> complete lifecycle:
> Setup → StartBundle* → ProcessElement* → FinishBundle* → Teardown
>
> The fact that @Teardown could not be called is a new detail for me too,
> and I
> also find weird to have a method that may or not be called as part of an
> API,
> why would users implement teardown if it will not be called? In that case
> probably a cleaner approach would be to get rid of that method altogether,
> no?
>
> But well maybe that’s not so easy too, there was another point: Some user
> reported an issue with leaking resources using KafkaIO in the Spark
> runner, for
> ref.
> https://apachebeam.slack.com/archives/C1AAFJYMP/p1510596938000622
>
> In that moment my understanding was that there was something fishy because
> we
> should be calling Teardown to close correctly the connections and free the
> resources in case of exceptions on start/process/finish, so I filled a
> JIRA and
> fixed this by enforcing the call of teardown for the Spark runner and the
> Flink
> runner:
> https://issues.apache.org/jira/browse/BEAM-3187
> https://issues.apache.org/jira/browse/BEAM-3244
>
> As you can see not calling this method does have consequences at least for
> non-containerized runners. Of course a runner that uses containers could
> not
> care about cleaning the resources this way, but a long living JVM in a
> Hadoop
> environment probably won’t have the same luck. So I am not sure that
> having a
> loose semantic there is the right option, I mean, runners could simply
> guarantee
> that they call teardown and if teardown takes too long they can decide to
> send a
> signal or kill the process/container/etc and go ahead, that way at least
> users
> would have a motivation to implement the teardown method, otherwise it
> doesn’t
> make any sense to have it (API wise).
>
> On Mon, Feb 19, 2018 at 11:30 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
> > Romain, would it be fair to say that currently the goal of your
> > participation in this discussion is to identify situations where
> @Teardown
> > in principle could have been called, but some of the current runners
> don't
> > make a good enough effort to call it? If yes - as I said before, please,
> by
> > all means, file bugs of the form "Runner X doesn't call @Teardown in
> > situation Y" if you're aware of any, and feel free to send PRs fixing
> runner
> > X to reliably call @Teardown in situation Y. I think we all agree that
> this
> > would be a good improvement.
> >
> > On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >>
> >>
> >>
> >> Le 19 févr. 2018 22:56, "Reuven Lax" <re...@google.com> a écrit :
> >>
> >>
> >>
> >> On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau
> >> <rmannibu...@gmail.com> wrote:
> >>>
> >>>
> >>>
> >>> Le 19 févr. 2018 21:28, "Reuven Lax" <re...@google.com> a écrit :
> >>>
> >>> How do you call teardown? There are cases in which the Java code gets
> no
> >>> indication that the restart is happening (e.g. cases where the machine
> >>> itself is taken down)
> >>>
> >>>
> >>> This is a bug, 0 downtime maintenance is very doable in 2018 ;).
> Crashes
> >>> are bugs, kill -9 to shutdown is a bug too. Other cases let call
> shutdown
> >>> with a hook worse case.
> >>
> >>
> >> What you say here is simply not true.
> >>
> >> There are many scenarios in which workers shutdown with no opportunity
> for
> >> any sort of shutdown hook. Sometimes the entire machine gets shutdown,
> and
> >> not even the OS will have much of a chance to do anything. At scale this
> >> will happen with some regularity, and a distributed system that assumes
> this
> >> will not happen is a poor distributed system.
> >>
> >>
> >> This is part of the infra and there is no reason the machine is shutdown
> >> without shutting down what runs on it before except if it is a bug in
> the
> >> software or setup. I can hear you maybe dont do it everywhere but there
> is
> >> no blocker to do it. Means you can shutdown the machines and guarantee
> >> teardown is called.
> >>
> >> Where i go is simply that it is doable and beam sdk core can assume
> setup
> >> is well done. If there is a best effort downside due to that - with the
> >> meaning you defined - it is an impl bug or a user installation issue.
> >>
> >> Technically all is true.
> >>
> >> What can prevent teardown is a hardware failure or so. This is fine and
> >> doesnt need to be in doc since it is life in IT and obvious or must be
> very
> >> explicit to avoid current ambiguity.
> >>
> >>
> >>>
> >>>
> >>>
> >>>
> >>> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau <
> rmannibu...@gmail.com>
> >>> wrote:
> >>>>
> >>>> Restarting doesnt mean you dont call teardown. Except a bug there is
> no
> >>>> reason - technically - it happens, no reason.
> >>>>
> >>>> Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit :
> >>>>>
> >>>>> Workers restarting is not a bug, it's standard often expected.
> >>>>>
> >>>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau
> >>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>
> >>>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery
> >>>>>> (procedure)
> >>>>>>
> >>>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a
> >>>>>> écrit :
> >>>>>>>
> >>>>>>> So what would you like to happen if there is a crash? The DoFn
> >>>>>>> instance no longer exists because the JVM it ran on no longer
> exists. What
> >>>>>>> should Teardown be called on?
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau
> >>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>> This is what i want and not 999999 teardowns for 1000000 setups
> >>>>>>>> until there is an unexpected crash (= a bug).
> >>>>>>>>
> >>>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit :
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau
> >>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau
> >>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but leads to
> >>>>>>>>>>>> the same pattern, the teardown is just a "if (iCreatedThem)
> releaseThem();"
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> How do you control "256?" Even if you have a pool of 256
> workers,
> >>>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are
> created per
> >>>>>>>>>>> worker. In theory the runner might decide to create 1000
> threads on each
> >>>>>>>>>>> worker.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Nop was the other way around, in this case on AWS you can get
> 256
> >>>>>>>>>> instances at once but not 512 (which will be 2x256). So when
> you compute the
> >>>>>>>>>> distribution you allocate to some fn the role to own the
> instance lookup and
> >>>>>>>>>> releasing.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I still don't understand. Let's be more precise. If you write the
> >>>>>>>>> following code:
> >>>>>>>>>
> >>>>>>>>>    pCollection.apply(ParDo.of(new MyDoFn()));
> >>>>>>>>>
> >>>>>>>>> There is no way to control how many instances of MyDoFn are
> >>>>>>>>> created. The runner might decided to create a million instances
> of this
> >>>>>>>>> class across your worker pool, which means that you will get a
> million Setup
> >>>>>>>>> and Teardown calls.
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Anyway this was just an example of an external resource you must
> >>>>>>>>>> release. Real topic is that beam should define asap a
> guaranteed generic
> >>>>>>>>>> lifecycle to let user embrace its programming model.
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Eugene:
> >>>>>>>>>>>> 1. wait logic is about passing the value which is not always
> >>>>>>>>>>>> possible (like 15% of cases from my raw estimate)
> >>>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Concretely beam exposes a portable API (included in the SDK
> >>>>>>>>>>>> core). This API defines a *container* API and therefore
> implies bean
> >>>>>>>>>>>> lifecycles. I'll not detail them all but just use the sources
> and dofn (not
> >>>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop.
> >>>>>>>>>>>>
> >>>>>>>>>>>> A. Source
> >>>>>>>>>>>>
> >>>>>>>>>>>> A source computes a partition plan with 2 primitives:
> >>>>>>>>>>>> estimateSize and split. As an user you can expect both to be
> called on the
> >>>>>>>>>>>> same bean instance to avoid to pay the same connection
> cost(s) twice.
> >>>>>>>>>>>> Concretely:
> >>>>>>>>>>>>
> >>>>>>>>>>>> connect()
> >>>>>>>>>>>> try {
> >>>>>>>>>>>>   estimateSize()
> >>>>>>>>>>>>   split()
> >>>>>>>>>>>> } finally {
> >>>>>>>>>>>>   disconnect()
> >>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>> this is not guaranteed by the API so you must do:
> >>>>>>>>>>>>
> >>>>>>>>>>>> connect()
> >>>>>>>>>>>> try {
> >>>>>>>>>>>>   estimateSize()
> >>>>>>>>>>>> } finally {
> >>>>>>>>>>>>   disconnect()
> >>>>>>>>>>>> }
> >>>>>>>>>>>> connect()
> >>>>>>>>>>>> try {
> >>>>>>>>>>>>   split()
> >>>>>>>>>>>> } finally {
> >>>>>>>>>>>>   disconnect()
> >>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>> + a workaround with an internal estimate size since this
> >>>>>>>>>>>> primitive is often called in split but you dont want to
> connect twice in the
> >>>>>>>>>>>> second phase.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Why do you need that? Simply cause you want to define an API
> to
> >>>>>>>>>>>> implement sources which initializes the source bean and
> destroys it.
> >>>>>>>>>>>> I insists it is a very very basic concern for such API.
> However
> >>>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building
> any API on top of
> >>>>>>>>>>>> beam is very hurtful today and for direct beam users you hit
> the exact same
> >>>>>>>>>>>> issues - check how IO are implemented, the static utilities
> which create
> >>>>>>>>>>>> volatile connections preventing to reuse existing connection
> in a single
> >>>>>>>>>>>> method
> >>>>>>>>>>>> (https://github.com/apache/beam/blob/master/sdks/java/io/
> elasticsearch/src/main/java/org/apache/beam/sdk/io/
> elasticsearch/ElasticsearchIO.java#L862).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Same logic applies to the reader which is then created.
> >>>>>>>>>>>>
> >>>>>>>>>>>> B. DoFn & SDF
> >>>>>>>>>>>>
> >>>>>>>>>>>> As a fn dev you expect the same from the beam runtime: init();
> >>>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and
> that it is
> >>>>>>>>>>>> executed on the exact same instance to be able to be stateful
> at that level
> >>>>>>>>>>>> for expensive connections/operations/flow state handling.
> >>>>>>>>>>>>
> >>>>>>>>>>>> As you mentionned with the million example, this sequence
> should
> >>>>>>>>>>>> happen for each single instance so 1M times for your example.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a
> >>>>>>>>>>>> generalisation of both cases (source and dofn). Therefore it
> creates way
> >>>>>>>>>>>> more instances and requires to have a way more
> strict/explicit definition of
> >>>>>>>>>>>> the exact lifecycle and which instance does what. Since beam
> handles the
> >>>>>>>>>>>> full lifecycle of the bean instances it must provide
> init/destroy hooks
> >>>>>>>>>>>> (setup/teardown) which can be stateful.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If you take the JDBC example which was mentionned earlier.
> >>>>>>>>>>>> Today, because of the teardown issue it uses bundles. Since
> bundles size is
> >>>>>>>>>>>> not defined - and will not with SDF, it must use a pool to be
> able to reuse
> >>>>>>>>>>>> a connection instance to not correct performances. Now with
> the SDF and the
> >>>>>>>>>>>> split increase, how do you handle the pool size? Generally in
> batch you use
> >>>>>>>>>>>> a single connection per thread to avoid to consume all
> database connections.
> >>>>>>>>>>>> With a pool you have 2 choices: 1. use a pool of 1, 2. use a
> pool a bit
> >>>>>>>>>>>> higher but multiplied by the number of beans you will likely
> x2 or 3 the
> >>>>>>>>>>>> connection count and make the execution fail with "no more
> connection
> >>>>>>>>>>>> available". I you picked 1 (pool of #1), then you still have
> to have a
> >>>>>>>>>>>> reliable teardown by pool instance (close() generally) to
> ensure you release
> >>>>>>>>>>>> the pool and don't leak the connection information in the
> JVM. In all case
> >>>>>>>>>>>> you come back to the init()/destroy() lifecycle even if you
> fake to get
> >>>>>>>>>>>> connections with bundles.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply
> >>>>>>>>>>>> all the current issues with the loose definition of the bean
> lifecycles at
> >>>>>>>>>>>> an exponential level, nothing else.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Romain Manni-Bucau
> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov
> >>>>>>>>>>>> <kirpic...@google.com>:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning can
> be
> >>>>>>>>>>>>> accomplished using the Wait transform as I suggested in the
> thread above,
> >>>>>>>>>>>>> and I believe it should become the canonical way to do that.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> (Would like to reiterate one more time, as the main author of
> >>>>>>>>>>>>> most design documents related to SDF and of its
> implementation in the Java
> >>>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to
> the topic of
> >>>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau
> >>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My
> >>>>>>>>>>>>>> understanding is that sdf could be a way to unify it and
> clean the api.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Otherwise how to normalize - single api -  lifecycle of
> >>>>>>>>>>>>>> transforms?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <
> bchamb...@apache.org>
> >>>>>>>>>>>>>> a écrit :
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific
> DoFn's
> >>>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary, it
> is around an
> >>>>>>>>>>>>>>> entire composite PTransform. I think there have been
> discussions/proposals
> >>>>>>>>>>>>>>> around a more methodical "cleanup" option, but those
> haven't been
> >>>>>>>>>>>>>>> implemented, to the best of my knowledge.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For instance, consider the steps of a FileIO:
> >>>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files
> >>>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do a
> >>>>>>>>>>>>>>> bulk copy to put them in the final destination.
> >>>>>>>>>>>>>>> 3. Cleanup all the temporary files.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (This is often desirable because it minimizes the chance of
> >>>>>>>>>>>>>>> seeing partial/incomplete results in the final
> destination).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many workers,
> >>>>>>>>>>>>>>> likely using a ParDo (say N different workers).
> >>>>>>>>>>>>>>> The move step should only happen once, so on one worker.
> This
> >>>>>>>>>>>>>>> means it will be a different DoFn, likely with some stuff
> done to ensure it
> >>>>>>>>>>>>>>> runs on one worker.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not
> >>>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule some
> cleanup work for
> >>>>>>>>>>>>>>> when the transform is "done". In batch this is relatively
> straightforward,
> >>>>>>>>>>>>>>> but doesn't exist. This is the source of some problems,
> such as BigQuery
> >>>>>>>>>>>>>>> sink leaving files around that have failed to import into
> BigQuery.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In streaming this is less straightforward -- do you want to
> >>>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to wait
> until the end of
> >>>>>>>>>>>>>>> the window? In practice, you just want to wait until you
> know nobody will
> >>>>>>>>>>>>>>> need the resource anymore.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This led to some discussions around a "cleanup" API, where
> >>>>>>>>>>>>>>> you could have a transform that output resource objects.
> Each resource
> >>>>>>>>>>>>>>> object would have logic for cleaning it up. And there
> would be something
> >>>>>>>>>>>>>>> that indicated what parts of the pipeline needed that
> resource, and what
> >>>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as
> that part of the
> >>>>>>>>>>>>>>> pipeline had advanced far enough that it would no longer
> need the resources,
> >>>>>>>>>>>>>>> they would get cleaned up. This can be done at pipeline
> shutdown, or
> >>>>>>>>>>>>>>> incrementally during a streaming pipeline, etc.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Would something like this be a better fit for your use
> case?
> >>>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn
> sufficient?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall
> >>>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a thread
> of a worker - has
> >>>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage
> collection.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In practise, new is often an unsafe allocate
> >>>>>>>>>>>>>>>> (deserialization) but it doesnt matter here.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What i want is any "new" to have a following setup before
> >>>>>>>>>>>>>>>> any process or stattbundle and the last time beam has the
> instance before it
> >>>>>>>>>>>>>>>> is gc-ed and after last finishbundle it calls teardown.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It is as simple as it.
> >>>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not
> self
> >>>>>>>>>>>>>>>> contained to implement basic transforms.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a
> >>>>>>>>>>>>>>>> écrit :
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau
> >>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers"
> >>>>>>>>>>>>>>>>>> <bchamb...@apache.org> a écrit :
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather
> >>>>>>>>>>>>>>>>>> than focusing on the semantics of the existing methods
> -- which have been
> >>>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would be
> helpful to focus on
> >>>>>>>>>>>>>>>>>> more on the reason you are looking for something with
> different semantics.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are
> trying
> >>>>>>>>>>>>>>>>>> to do):
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was
> >>>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline. If
> this is the case,
> >>>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized
> once (and not once per
> >>>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you
> know when the pipeline
> >>>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches
> step X", then what
> >>>>>>>>>>>>>>>>>> about a streaming pipeline?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when the
> >>>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a
> jvm shutdown)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I'm really not following what this means.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and
> each
> >>>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy of
> the same DoFn). How
> >>>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M
> cleanups) and when
> >>>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is shut
> down? When an
> >>>>>>>>>>>>>>>>> individual worker is about to shut down (which may be
> temporary - may be
> >>>>>>>>>>>>>>>>> about to start back up)? Something else?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some
> >>>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle
> methods are not a good fit
> >>>>>>>>>>>>>>>>>> for this (they are focused on managing resources within
> the DoFn), you could
> >>>>>>>>>>>>>>>>>> model this on how FileIO finalizes the files that it
> produced. For instance:
> >>>>>>>>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token that
> >>>>>>>>>>>>>>>>>> stores information about resources)
> >>>>>>>>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent retries
> >>>>>>>>>>>>>>>>>> from changing resource IDs)
> >>>>>>>>>>>>>>>>>>    c) ParDo that initializes the resources
> >>>>>>>>>>>>>>>>>>    d) Pipeline segments that use the resources, and
> >>>>>>>>>>>>>>>>>> eventually output the fact they're done
> >>>>>>>>>>>>>>>>>>    e) "Require Deterministic Input"
> >>>>>>>>>>>>>>>>>>    f) ParDo that frees the resources
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> By making the use of the resource part of the data it is
> >>>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use
> or have been finished
> >>>>>>>>>>>>>>>>>> by using the require deterministic input. This is
> important to ensuring
> >>>>>>>>>>>>>>>>>> everything is actually cleaned up.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I nees that but generic and not case by case to
> >>>>>>>>>>>>>>>>>> industrialize some api on top of beam.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is
> >>>>>>>>>>>>>>>>>> this case, could you elaborate on what you are trying
> to accomplish? That
> >>>>>>>>>>>>>>>>>> would help me understand both the problems with
> existing options and
> >>>>>>>>>>>>>>>>>> possibly what could be done to help.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all cases
> but
> >>>>>>>>>>>>>>>>>> means each transform is different in its lifecycle
> handling  except i
> >>>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you
> cant put any unified
> >>>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard
> to integrate or to use
> >>>>>>>>>>>>>>>>>> to build higher level libraries or softwares.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> This is why i tried to not start the workaround
> >>>>>>>>>>>>>>>>>> discussions and just stay at API level.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -- Ben
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov
> >>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of
> the
> >>>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine
> machine.
> >>>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called
> >>>>>>>>>>>>>>>>>>>> except in various situations where it's logically
> impossible or impractical
> >>>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or you
> can list some of the
> >>>>>>>>>>>>>>>>>>>> examples above.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Sounds ok to me
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see
> >>>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be
> called - it's not just
> >>>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very
> important (e.g. cleaning up
> >>>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a
> large number of VMs you
> >>>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of the
> other methods that
> >>>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a
> cost, e.g. no
> >>>>>>>>>>>>>>>>>>>> pass-by-reference).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not
> >>>>>>>>>>>>>>>>>>> which which other method you speak about. Concretely
> if you make it really
> >>>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me -
> then users can use it
> >>>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen but
> it is unexpected and
> >>>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a
> manual - or auto if fancy
> >>>>>>>>>>>>>>>>>>> - recovery procedure. This is where it makes all the
> difference and impacts
> >>>>>>>>>>>>>>>>>>> the developpers, ops (all users basically).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It
> >>>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is what
> triggered this thread.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents
> >>>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very
> badly and wrongly
> >>>>>>>>>>>>>>>>>>>>> perceived by users (like I did).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn |
> >>>>>>>>>>>>>>>>>>>>> Book
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov
> >>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it:
> >>>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic
> crash), and in a number of
> >>>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container
> has crashed (eg user code
> >>>>>>>>>>>>>>>>>>>>>> in a different thread called a C library over JNI
> and it segfaulted), JVM
> >>>>>>>>>>>>>>>>>>>>>> bug, crash due to user code OOM, in case the worker
> has lost network
> >>>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be
> able to do anything
> >>>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible
> VM and it was preempted by
> >>>>>>>>>>>>>>>>>>>>>> the underlying cluster manager without notice or if
> the worker was too busy
> >>>>>>>>>>>>>>>>>>>>>> with other stuff (eg calling other Teardown
> functions) until the preemption
> >>>>>>>>>>>>>>>>>>>>>> timeout elapsed, in case the underlying hardware
> simply failed (which
> >>>>>>>>>>>>>>>>>>>>>> happens quite often at scale), and in many other
> conditions.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe
> >>>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for
> cases where you observed a
> >>>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it
> was possible to call it but
> >>>>>>>>>>>>>>>>>>>>>> the runner made insufficient effort.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov
> >>>>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles"
> >>>>>>>>>>>>>>>>>>>>>>>>> <k...@google.com> a écrit :
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain
> Manni-Bucau
> >>>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need
> (e.g.
> >>>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and it
> requires the following
> >>>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic
> and the following processing
> >>>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a
> >>>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer
> since size is not controlled.
> >>>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the
> connection since it is a best
> >>>>>>>>>>>>>>>>>>>>>>>>>> effort thing. Not releasing the connection
> makes you pay a lot - aws ;) - or
> >>>>>>>>>>>>>>>>>>>>>>>>>> prevents you to launch other processings -
> concurrent limit.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If
> >>>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not called
> then nothing else can be
> >>>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What AWS
> service are you thinking of
> >>>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when everything
> at the other end has died?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but
> >>>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some
> closing exchanges which are not
> >>>>>>>>>>>>>>>>>>>>>>>>> only "im leaving".
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some
> services
> >>>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup
> and closing them at the end.
> >>>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines and
> money. You can say it can be
> >>>>>>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;).
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle
> its
> >>>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for
> generic pipelines and if
> >>>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring
> >>>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be
> handled by any human system?
> >>>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not
> handle it? Is it due to some
> >>>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented
> >>>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what kind
> of change you're asking
> >>>>>>>>>>>>>>>>>>>>>>>> for.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not
> >>>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :).
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct
> runner
> >>>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he
> will get a different behavior
> >>>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know
> what the IOs he composes use
> >>>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product
> than he must be handled IMHO.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in
> big
> >>>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore what
> people did for years and do
> >>>>>>>>>>>>>>>>>>>>>>>>> it wrong before doing right ;).
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to
> >>>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the
> execution of teardown. Then we
> >>>>>>>>>>>>>>>>>>>>>>>>> see if we can handle it and only if there is a
> technical reason we cant we
> >>>>>>>>>>>>>>>>>>>>>>>>> make it experimental/unsupported in the api. I
> know spark and flink can, any
> >>>>>>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through
> java
> >>>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam
> enclosing software) is fully
> >>>>>>>>>>>>>>>>>>>>>>>>> unhandled and your overall system is
> uncontrolled. Only case where it is not
> >>>>>>>>>>>>>>>>>>>>>>>>> true is when the software is always owned by a
> vendor and never installed on
> >>>>>>>>>>>>>>>>>>>>>>>>> customer environment. In this case it belongd to
> the vendor to handle beam
> >>>>>>>>>>>>>>>>>>>>>>>>> API and not to beam to adjust its API for a
> vendor - otherwise all
> >>>>>>>>>>>>>>>>>>>>>>>>> unsupported features by one runner should be
> made optional right?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in
> distributed
> >>>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and
> defined lifecycle.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>
> >>
> >>
> >
>

Reply via email to