The subscriptions I want to clean up are ones that are implicitly created by the PubsubIO. These subscriptions are created then leaked, they aren't reused in future pipelines so the data loss issues are moot here. I agree that we don't want to tear down user supplied subscriptions.
I've been doing some more digging, it looks like the Source.Reader interface has a close() callback <https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/Source.Reader.html#close-->. Is that a place I might be able to do cleanup? (It appears this is hooked up to RichFunction.close() callback on Flink and called from the Direct Runner but possibly not called from other runners.) Andrew On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <re...@google.com> wrote: > Actually I think SDF is the right way to fix this. The SDF can set a timer > at infinity (which will only fires when the pipeline shuts down). I believe > that SDF support is being added to the portability layer now, so eventually > all portable runners will support it, and maybe we can live with the status > quo until then. > > On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> I agree Reuven. But leaking in a source doesnt give any guarantee >> regarding the execution since it will depends the runner and current API >> will not provide you that feature. Using a reference counting state can >> work better but would require a sdf migration (and will hit runner support >> issues :(). >> >> >> Le jeu. 2 août 2018 05:39, Reuven Lax <re...@google.com> a écrit : >> >>> Hi Romain, >>> >>> Andrew's example actually wouldn't work for that. With Google Cloud >>> Pub/Sub (the example source he referenced), if there is no subscription to >>> a topic, all publishes to that topic are dropped on the floor; if you don't >>> want to lose data, your are expected to keep the subscription around >>> continuously. In this example, leaking a subscription is probably >>> preferable to losing date (especially since Pub/Sub itself garbage collects >>> subscriptions that have been inactive for a long time). >>> >>> The answer might be that Beam does not have a good lifecycle story here, >>> and something needs to be built. >>> >>> Reuven >>> >>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> Hi Andrew, >>>> >>>> IIRC sources should clean up their resources per method since they dont >>>> have a better lifecycle. Readers can create anything longer and release it >>>> at close time. >>>> >>>> >>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <apill...@google.com> a >>>> écrit : >>>> >>>>> Some of our IOs create external resources that need to be cleaned up >>>>> when a pipeline is terminated. It looks like the >>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, >>>>> but >>>>> there is no call for cleanup. For example, PubsubIO creates a Pubsub >>>>> subcription in createReader()/split() and it should be deleted at >>>>> shutdown. >>>>> Does anyone have ideas on how I might make this happen? >>>>> >>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking the >>>>> PubSub specific issue.) >>>>> >>>>> Andrew >>>>> >>>>