Le jeu. 2 août 2018 18:32, Andrew Pilloud <[email protected]> a écrit :
> The subscriptions I want to clean up are ones that are implicitly created > by the PubsubIO. These subscriptions are created then leaked, they aren't > reused in future pipelines so the data loss issues are moot here. I agree > that we don't want to tear down user supplied subscriptions. > > I've been doing some more digging, it looks like the Source.Reader > interface has a close() callback > <https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/Source.Reader.html#close-->. > Is that a place I might be able to do cleanup? (It appears this is hooked > up to RichFunction.close() callback on Flink and called from the Direct > Runner but possibly not called from other runners.) > It is after the parallelization (you can have N>1 readers in parallel) so if you have some global reference counting to cleanup once yes, otherwise it will be hard. > Andrew > > On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <[email protected]> wrote: > >> Actually I think SDF is the right way to fix this. The SDF can set a >> timer at infinity (which will only fires when the pipeline shuts down). I >> believe that SDF support is being added to the portability layer now, so >> eventually all portable runners will support it, and maybe we can live with >> the status quo until then. >> >> On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <[email protected]> >> wrote: >> >>> I agree Reuven. But leaking in a source doesnt give any guarantee >>> regarding the execution since it will depends the runner and current API >>> will not provide you that feature. Using a reference counting state can >>> work better but would require a sdf migration (and will hit runner support >>> issues :(). >>> >>> >>> Le jeu. 2 août 2018 05:39, Reuven Lax <[email protected]> a écrit : >>> >>>> Hi Romain, >>>> >>>> Andrew's example actually wouldn't work for that. With Google Cloud >>>> Pub/Sub (the example source he referenced), if there is no subscription to >>>> a topic, all publishes to that topic are dropped on the floor; if you don't >>>> want to lose data, your are expected to keep the subscription around >>>> continuously. In this example, leaking a subscription is probably >>>> preferable to losing date (especially since Pub/Sub itself garbage collects >>>> subscriptions that have been inactive for a long time). >>>> >>>> The answer might be that Beam does not have a good lifecycle story >>>> here, and something needs to be built. >>>> >>>> Reuven >>>> >>>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau < >>>> [email protected]> wrote: >>>> >>>>> Hi Andrew, >>>>> >>>>> IIRC sources should clean up their resources per method since they >>>>> dont have a better lifecycle. Readers can create anything longer and >>>>> release it at close time. >>>>> >>>>> >>>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <[email protected]> a >>>>> écrit : >>>>> >>>>>> Some of our IOs create external resources that need to be cleaned up >>>>>> when a pipeline is terminated. It looks like the >>>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, >>>>>> but >>>>>> there is no call for cleanup. For example, PubsubIO creates a Pubsub >>>>>> subcription in createReader()/split() and it should be deleted at >>>>>> shutdown. >>>>>> Does anyone have ideas on how I might make this happen? >>>>>> >>>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking >>>>>> the PubSub specific issue.) >>>>>> >>>>>> Andrew >>>>>> >>>>>
