Le jeu. 2 août 2018 18:32, Andrew Pilloud <[email protected]> a écrit :

> The subscriptions I want to clean up are ones that are implicitly created
> by the PubsubIO. These subscriptions are created then leaked, they aren't
> reused in future pipelines so the data loss issues are moot here. I agree
> that we don't want to tear down user supplied subscriptions.
>
> I've been doing some more digging, it looks like the Source.Reader
> interface has a close() callback
> <https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/io/Source.Reader.html#close-->.
> Is that a place I might be able to do cleanup? (It appears this is hooked
> up to RichFunction.close() callback on Flink and called from the Direct
> Runner but possibly not called from other runners.)
>


It is after the parallelization (you can have N>1 readers in parallel) so
if you have some global reference counting to cleanup once yes, otherwise
it will be hard.


> Andrew
>
> On Thu, Aug 2, 2018 at 1:07 AM Reuven Lax <[email protected]> wrote:
>
>> Actually I think SDF is the right way to fix this. The SDF can set a
>> timer at infinity (which will only fires when the pipeline shuts down). I
>> believe that SDF support is being added to the portability layer now, so
>> eventually all portable runners will support it, and maybe we can live with
>> the status quo until then.
>>
>> On Wed, Aug 1, 2018 at 9:59 PM Romain Manni-Bucau <[email protected]>
>> wrote:
>>
>>> I agree Reuven. But leaking in a source doesnt give any guarantee
>>> regarding the execution since it will depends the runner and current API
>>> will not provide you that feature. Using a reference counting state can
>>> work better but would require a sdf migration (and will hit runner support
>>> issues :().
>>>
>>>
>>> Le jeu. 2 août 2018 05:39, Reuven Lax <[email protected]> a écrit :
>>>
>>>> Hi Romain,
>>>>
>>>> Andrew's example actually wouldn't work for that. With Google Cloud
>>>> Pub/Sub (the example source he referenced), if there is no subscription to
>>>> a topic, all publishes to that topic are dropped on the floor; if you don't
>>>> want to lose data, your are expected to keep the subscription around
>>>> continuously. In this example, leaking a subscription is probably
>>>> preferable to losing date (especially since Pub/Sub itself garbage collects
>>>> subscriptions that have been inactive for a long time).
>>>>
>>>> The answer might be that Beam does not have a good lifecycle story
>>>> here, and something needs to be built.
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Jul 31, 2018 at 10:04 PM Romain Manni-Bucau <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> IIRC sources should clean up their resources per method since they
>>>>> dont have a better lifecycle. Readers can create anything longer and
>>>>> release it at close time.
>>>>>
>>>>>
>>>>> Le mer. 1 août 2018 00:31, Andrew Pilloud <[email protected]> a
>>>>> écrit :
>>>>>
>>>>>> Some of our IOs create external resources that need to be cleaned up
>>>>>> when a pipeline is terminated. It looks like the
>>>>>> org.apache.beam.sdk.io.UnboundedSource interface is called on creation, 
>>>>>> but
>>>>>> there is no call for cleanup. For example, PubsubIO creates a Pubsub
>>>>>> subcription in createReader()/split() and it should be deleted at 
>>>>>> shutdown.
>>>>>> Does anyone have ideas on how I might make this happen?
>>>>>>
>>>>>> (I filed https://issues.apache.org/jira/browse/BEAM-5051 tracking
>>>>>> the PubSub specific issue.)
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>

Reply via email to