I'm going to also comment on why you would Start/FinishBundle or Setup/Teardown. Generally, StartBundle/FinishBundle is for processing behavior and correctness, while Setup/Teardown are about managing persistent resources (like a connection in your case).
To be specific, FinishBundle must be called before any work is committed, and you must flush any state based on the elements processed within the bundle. This ensures that when the input bundle is checkpointed, any state is committed and operations that are performed based on the input elements have been persisted. If this is not done and an input is checkpointed, no DoFn will see the input elements again - which means if a worker crashes, or Teardown is not called for any reason, the state generated by those elements will be permanently lost. KafkaIO is an excellent example of both of these patterns ( https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1332 ) In @Setup, we construct a producer, which we can reuse for the life of the Fn In @ProcessElement, we write a sequence of elements to the producer In @FinishBundle, we flush the producer, clearing out any state. If the input elements are never seen again, the elements will still be reflected in Kafka In @Teardown, we close the producer and free any associated resources. On Fri, Nov 18, 2016 at 11:12 AM, Demin Alexey <diomi...@gmail.com> wrote: > Oh, this is my mistake > > Yes correct way its use @Setup. > > Thank you Eugene. > > > 2016-11-18 22:54 GMT+04:00 Eugene Kirpichov <kirpic...@google.com.invalid> > : > > > Hi Alexey, > > > > In general, things like establishing connections and initializing caches > > are better done in @Setup and @TearDown methods, rather than @StartBundle > > and @FinishBundle, because DoFn's can be reused between bundles and this > > way you get more benefit from reuse. > > > > Bundles can be pretty small, especially in streaming pipelines. That > said, > > they normally shouldn't be 1-element-small. Hopefully someone working on > > the Flink runner can comment. > > > > On Fri, Nov 18, 2016 at 10:47 AM amir bahmanyari > > <amirto...@yahoo.com.invalid> wrote: > > > > > Hmmm...Thanks...This could very well be my bottleneck since I see tons > of > > > threads get on WAIT state after sometime& stay like that relatively > > > forever.I have a 100 G worth of elements to process...........Is there > a > > > way to bypass this "startBundle" & get a fairly optimized > > > behavior?Anyone? Thanks+regardsAmir- > > > > > > From: Demin Alexey <diomi...@gmail.com> > > > To: dev@beam.incubator.apache.org; amir bahmanyari < > amirto...@yahoo.com > > > > > > Sent: Friday, November 18, 2016 10:40 AM > > > Subject: Re: Flink runner. Wrapper for DoFn > > > > > > Very simple example: > > > > > > My DoFn on startBundle load filters from remote db and build optimized > > > index, on processElement apply filters on every element for decision > > about > > > push element to next operation or drop his. > > > > > > In current implementation it's like matching regexp on string, you > have 2 > > > way > > > 1) compile regexp every time for every element > > > 2) compile regexp one time and apply on all element > > > > > > now flink work by 1 way and this way not optimal > > > > > > > > > 2016-11-18 22:26 GMT+04:00 amir bahmanyari <amirto...@yahoo.com.invalid > > >: > > > > > > > Hi Alexey," startBundle can be expensive"...Could you elaborate on > > > > "expensive" as per each element pls? > > > > Thanks > > > > > > > > From: Demin Alexey <diomi...@gmail.com> > > > > To: dev@beam.incubator.apache.org > > > > Sent: Friday, November 18, 2016 7:40 AM > > > > Subject: Flink runner. Wrapper for DoFn > > > > > > > > Hi > > > > > > > > In flink runner we have this code: > > > > > > > > https://github.com/apache/incubator-beam/blob/master/ > > > > runners/flink/runner/src/main/java/org/apache/beam/runners/ > > > > flink/translation/wrappers/streaming/DoFnOperator.java#L262 > > > > > > > > but in mostly cases method startBundle can be expensive for making > for > > > > every element (for example connection for db/build cache/ etc) > > > > > > > > Why so important invoke startBundle/finishBundle on every > > > > incoming streamRecord ? > > > > > > > > Thanks > > > > Alexey Diomin > > > > > > > > > > > > > > > > > > > > > > > > > > > >