As noted, there is currently no support for Flink savepoints through the
Beam API.

However, it is now possible to restore from a savepoint with a Flink runner
specific pipeline option:

https://issues.apache.org/jira/browse/BEAM-5396
https://github.com/apache/beam/pull/7169#issuecomment-443283332

This was just merged - we are going to use it for the Python pipelines.

Thomas


On Mon, Dec 3, 2018 at 8:54 AM Lukasz Cwik <[email protected]> wrote:

> There are propoosals for pipeline drain[1] and also for snapshot and
> update[2] for Apache Beam. We would love contributions in this space.
>
> 1:
> https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8
> 2:
> https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY
>
> On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins <[email protected]> wrote:
>
>> Hi JC,
>>
>> Thanks for the quick response!
>> I had hoped for an in-pipeline solution for runner portability but it is
>> nice to know we're not the only ones stepping outside to interact with
>> runner management. :-)
>>
>> Wayne
>>
>>
>> On 2018-12-03 01:23, Juan Carlos Garcia wrote:
>>
>> Hi Wayne,
>>
>> We have the same setup and we do daily updates to our pipeline.
>>
>> The way we do it is using the flink tool via a Jenkins.
>>
>> Basically our deployment job do as follow:
>>
>> 1. Detect if the pipeline is running (it matches via job name)
>>
>> 2. If found, do a flink cancel with a savepoint (we uses hdfs for
>> checkpoint / savepoint) under a given directory.
>>
>> 3. It uses the flink run command for the new job and specify the
>> savepoint from step 2.
>>
>> I don't think there is any support to achieve the same from within the
>> pipeline. You need to do this externally as explained above.
>>
>> Best regards,
>> JC
>>
>>
>> Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins <[email protected]>
>> geschrieben:
>>
>>> Hi all,
>>> We have a number of Beam pipelines processing unbounded streams sourced
>>> from Kafka on the Flink runner and are very happy with both the platform
>>> and performance!
>>>
>>> The problem is with shutting down the pipelines...for version upgrades,
>>> system maintenance, load management, etc. it would be nice to be able to
>>> gracefully shut these down under software control but haven't been able to
>>> find a way to do so. We're in good shape on checkpointing and then cleanly
>>> recovering but shutdowns are all destructive to Flink or the Flink
>>> TaskManager.
>>>
>>> Methods tried:
>>>
>>> 1) Calling cancel on FlinkRunnerResult returned from pipeline.run()
>>> This would be our preferred method but p.run() doesn't return until
>>> termination and even if it did, the runner code simply throws:
>>> "throw new UnsupportedOperationException("FlinkRunnerResult does not
>>> support cancel.");"
>>> so this doesn't appear to be a near-term option.
>>>
>>> 2) Inject a "termination" message into the pipeline via Kafka
>>> This does get through, but calling exit() from a stage in the pipeline
>>> also terminates the Flink TaskManager.
>>>
>>> 3) Inject a "sleep" message, then manually restart the cluster
>>> This is our current method: we pause the data at the source, flood all
>>> branches of the pipeline with a "we're going down" msg so the stages can do
>>> a bit of housekeeping, then hard-stop the entire environment and re-launch
>>> with the new version.
>>>
>>> Is there a "Best Practice" method for gracefully terminating an
>>> unbounded pipeline from within the pipeline or from the mainline that
>>> launches it?
>>>
>>> Thanks!
>>> Wayne
>>>
>>> --
>>> Wayne Collinsdades.ca Inc.mailto:[email protected] <[email protected]>
>>> cell:416-898-5137
>>>
>>>
>> --
>> Wayne Collinsdades.ca Inc.mailto:[email protected] <[email protected]>
>> cell:416-898-5137
>>
>>

Reply via email to