Repository: beam
Updated Branches:
  refs/heads/master 1866a0113 -> 462335caf


Flink runner: specify checkpointTimeout  through PipelineOptions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/802f10af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/802f10af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/802f10af

Branch: refs/heads/master
Commit: 802f10afd5d73ba32ad90ba222f2d80216a18a4d
Parents: b8035ae
Author: Pei He <p...@apache.org>
Authored: Sat May 27 14:59:22 2017 +0800
Committer: Pei He <hepei...@alibaba-inc.com>
Committed: Sun Jun 4 16:18:36 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkPipelineExecutionEnvironment.java   | 2 ++
 .../org/apache/beam/runners/flink/FlinkPipelineOptions.java     | 5 +++++
 2 files changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/802f10af/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 98f7c5a..fe5dd87 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -228,6 +228,8 @@ class FlinkPipelineExecutionEnvironment {
         throw new IllegalArgumentException("The checkpoint interval must be 
positive");
       }
       flinkStreamEnv.enableCheckpointing(checkpointInterval, 
options.getCheckpointingMode());
+      flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout(
+          options.getCheckpointTimeoutMillis());
       boolean externalizedCheckpoint = 
options.isExternalizedCheckpointsEnabled();
       boolean retainOnCancellation = 
options.getRetainExternalizedCheckpointsOnCancellation();
       if (externalizedCheckpoint) {

http://git-wip-us.apache.org/repos/asf/beam/blob/802f10af/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index ee07abb..c255672 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -76,6 +76,11 @@ public interface FlinkPipelineOptions
   CheckpointingMode getCheckpointingMode();
   void setCheckpointingMode(CheckpointingMode mode);
 
+  @Description("The maximum time that a checkpoint may take before being 
discarded.")
+  @Default.Long(20 * 60 * 1000)
+  Long getCheckpointTimeoutMillis();
+  void setCheckpointTimeoutMillis(Long checkpointTimeoutMillis);
+
   @Description("Sets the number of times that failed tasks are re-executed. "
       + "A value of zero effectively disables fault tolerance. A value of -1 
indicates "
       + "that the system default value (as defined in the configuration) 
should be used.")

Reply via email to