Bundle boundaries are unspecified, dependent on the runner and the
particular circumstances during this particular execution, and are
generally unrelated to windowing or to the data contents itself. They have
no semantic meaning - everything would still work exactly the same way even
if every element was in its own bundle, or if the entire contents of a
PCollection was in a single bundle. Bundle boundaries are only a way for
the runner to communicate to you what are the allowed boundaries of
amortizing work: basically, all you need to know for practical purposes is
"if you do batching, flush it in finishBundle".

On Tue, Jul 3, 2018 at 2:29 PM [email protected] <
[email protected]> wrote:

> Thanks. It is much clearer now...
>
> However, the code comments don't mention how often are
> {Start,Finish}Bundle called. What constitutes a batch?
>
> If I am using a window of 1 minute, can I expect for {Start,Finish}Bundle
> every minute? In other words, will the window produce a batch of my data?
>
> On 2018/07/03 20:31:56, Eugene Kirpichov <[email protected]> wrote:
> > Hi Eduardo,
> > Henning is right - the specific guarantees around Setup/Teardown vs.
> > StartBundle/FinishBundle are currently described best in the Java SDK
> > documentation:
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
> > (see
> > documentation to @Setup, @Teardown, @StartBundle, @FinishBundle).
> > For what you are doing, StartBundle/FinishBundle is 100% the proper
> > abstraction - in the current code you are going to see data loss or
> > corruption because the code is violating the fundamental guarantee that
> by
> > the time FinishBundle returns, all work associated with the bundle must
> be
> > completed. Pooling or batching mutations is in fact the primary use case
> > for Start/FinishBundle. Setup/Teardown are for managing volatile
> resources
> > like connections.
> >
> > On Tue, Jul 3, 2018 at 1:10 PM Henning Rohde <[email protected]> wrote:
> >
> > > Teardown has very loose guarantees on when it's called and you
> essentially
> > > can't rely on it. Currently, for Go on non-direct runners, we hang on
> to
> > > the bundle descriptors forever and never destroy them (and in turn
> never
> > > call Teardown). Even if we didn't, failures/restarts could cause
> Teardown
> > > to not be called.
> > >
> > > If something _must_ happen, FinishBundle is the right method.
> > >
> > > Thanks,
> > >  Henning
> > >
> > > On Tue, Jul 3, 2018 at 10:37 AM [email protected] <
> > > [email protected]> wrote:
> > >
> > >> Essentially I have the following code:
> > >>
> > >> type Writer struct {
> > >>   Pool WriterPool
> > >> }
> > >>
> > >> func (w *Writer) Setup() {
> > >>  w.Pool = Init()
> > >> }
> > >>
> > >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> > >>   w.Pool.Add(elem)
> > >> }
> > >>
> > >> func (w* Writer) Teardown() {
> > >>   w.Pool.Write()
> > >>   w.Pool.Close()
> > >> }
> > >>
> > >> beam.ParDo0(scope, &Writer{}, elemCollection)
> > >>
> > >> The above code runs fine with the direct runner but not with dataflow.
> > >>
> > >>  I added log lines to the above methods, and the ones in Teardown()
> never
> > >> appear in the logs.
> > >> If I change my code as follows:
> > >>
> > >> func (w* Writer) ProcessElement(ctx, elem Elem) {
> > >>   w.Pool.Add(elem)
> > >>   w.Pool.Write()
> > >> }
> > >>
> > >> Then I see the data being written, but I lose the ability to pool,
> plus I
> > >> am leaking connections.
> > >>
> > >> Is this a known issue, or I am going something wrong?
> > >>
> > >> Thanks again for the help.
> > >>
> > >
> >
>

Reply via email to