Hi Eduardo,
Henning is right - the specific guarantees around Setup/Teardown vs.
StartBundle/FinishBundle are currently described best in the Java SDK
documentation:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
(see
documentation to @Setup, @Teardown, @StartBundle, @FinishBundle).
For what you are doing, StartBundle/FinishBundle is 100% the proper
abstraction - in the current code you are going to see data loss or
corruption because the code is violating the fundamental guarantee that by
the time FinishBundle returns, all work associated with the bundle must be
completed. Pooling or batching mutations is in fact the primary use case
for Start/FinishBundle. Setup/Teardown are for managing volatile resources
like connections.

On Tue, Jul 3, 2018 at 1:10 PM Henning Rohde <[email protected]> wrote:

> Teardown has very loose guarantees on when it's called and you essentially
> can't rely on it. Currently, for Go on non-direct runners, we hang on to
> the bundle descriptors forever and never destroy them (and in turn never
> call Teardown). Even if we didn't, failures/restarts could cause Teardown
> to not be called.
>
> If something _must_ happen, FinishBundle is the right method.
>
> Thanks,
>  Henning
>
> On Tue, Jul 3, 2018 at 10:37 AM [email protected] <
> [email protected]> wrote:
>
>> Essentially I have the following code:
>>
>> type Writer struct {
>>   Pool WriterPool
>> }
>>
>> func (w *Writer) Setup() {
>>  w.Pool = Init()
>> }
>>
>> func (w* Writer) ProcessElement(ctx, elem Elem) {
>>   w.Pool.Add(elem)
>> }
>>
>> func (w* Writer) Teardown() {
>>   w.Pool.Write()
>>   w.Pool.Close()
>> }
>>
>> beam.ParDo0(scope, &Writer{}, elemCollection)
>>
>> The above code runs fine with the direct runner but not with dataflow.
>>
>>  I added log lines to the above methods, and the ones in Teardown() never
>> appear in the logs.
>> If I change my code as follows:
>>
>> func (w* Writer) ProcessElement(ctx, elem Elem) {
>>   w.Pool.Add(elem)
>>   w.Pool.Write()
>> }
>>
>> Then I see the data being written, but I lose the ability to pool, plus I
>> am leaking connections.
>>
>> Is this a known issue, or I am going something wrong?
>>
>> Thanks again for the help.
>>
>

Reply via email to