As noted, there is currently no support for Flink savepoints through the Beam API.
However, it is now possible to restore from a savepoint with a Flink runner specific pipeline option: https://issues.apache.org/jira/browse/BEAM-5396 https://github.com/apache/beam/pull/7169#issuecomment-443283332 This was just merged - we are going to use it for the Python pipelines. Thomas On Mon, Dec 3, 2018 at 8:54 AM Lukasz Cwik <[email protected]> wrote: > There are propoosals for pipeline drain[1] and also for snapshot and > update[2] for Apache Beam. We would love contributions in this space. > > 1: > https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8 > 2: > https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY > > On Mon, Dec 3, 2018 at 7:05 AM Wayne Collins <[email protected]> wrote: > >> Hi JC, >> >> Thanks for the quick response! >> I had hoped for an in-pipeline solution for runner portability but it is >> nice to know we're not the only ones stepping outside to interact with >> runner management. :-) >> >> Wayne >> >> >> On 2018-12-03 01:23, Juan Carlos Garcia wrote: >> >> Hi Wayne, >> >> We have the same setup and we do daily updates to our pipeline. >> >> The way we do it is using the flink tool via a Jenkins. >> >> Basically our deployment job do as follow: >> >> 1. Detect if the pipeline is running (it matches via job name) >> >> 2. If found, do a flink cancel with a savepoint (we uses hdfs for >> checkpoint / savepoint) under a given directory. >> >> 3. It uses the flink run command for the new job and specify the >> savepoint from step 2. >> >> I don't think there is any support to achieve the same from within the >> pipeline. You need to do this externally as explained above. >> >> Best regards, >> JC >> >> >> Am Mo., 3. Dez. 2018, 00:46 hat Wayne Collins <[email protected]> >> geschrieben: >> >>> Hi all, >>> We have a number of Beam pipelines processing unbounded streams sourced >>> from Kafka on the Flink runner and are very happy with both the platform >>> and performance! >>> >>> The problem is with shutting down the pipelines...for version upgrades, >>> system maintenance, load management, etc. it would be nice to be able to >>> gracefully shut these down under software control but haven't been able to >>> find a way to do so. We're in good shape on checkpointing and then cleanly >>> recovering but shutdowns are all destructive to Flink or the Flink >>> TaskManager. >>> >>> Methods tried: >>> >>> 1) Calling cancel on FlinkRunnerResult returned from pipeline.run() >>> This would be our preferred method but p.run() doesn't return until >>> termination and even if it did, the runner code simply throws: >>> "throw new UnsupportedOperationException("FlinkRunnerResult does not >>> support cancel.");" >>> so this doesn't appear to be a near-term option. >>> >>> 2) Inject a "termination" message into the pipeline via Kafka >>> This does get through, but calling exit() from a stage in the pipeline >>> also terminates the Flink TaskManager. >>> >>> 3) Inject a "sleep" message, then manually restart the cluster >>> This is our current method: we pause the data at the source, flood all >>> branches of the pipeline with a "we're going down" msg so the stages can do >>> a bit of housekeeping, then hard-stop the entire environment and re-launch >>> with the new version. >>> >>> Is there a "Best Practice" method for gracefully terminating an >>> unbounded pipeline from within the pipeline or from the mainline that >>> launches it? >>> >>> Thanks! >>> Wayne >>> >>> -- >>> Wayne Collinsdades.ca Inc.mailto:[email protected] <[email protected]> >>> cell:416-898-5137 >>> >>> >> -- >> Wayne Collinsdades.ca Inc.mailto:[email protected] <[email protected]> >> cell:416-898-5137 >> >>
