[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2608


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82978898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
@@ -256,29 +306,51 @@ public boolean acknowledgeTask(
 * Aborts a checkpoint because it expired (took too long).
 */
public void abortExpired() throws Exception {
--- End diff --

I would like to do this as a follow up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82977266
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -18,23 +18,105 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Savepoint store used to persist {@link Savepoint} instances.
+ * A file system based savepoint store.
  *
- * The main implementation is the {@link FsSavepointStore}. We also 
have the
- * {@link HeapSavepointStore} for historical reasons (introduced in Flink 
1.0).
+ * Stored savepoints have the following format:
+ * 
+ * MagicNumber SavepointVersion Savepoint
+ *   - MagicNumber => int
+ *   - SavepointVersion => int (returned by Savepoint#getVersion())
+ *   - Savepoint => bytes (serialized via version-specific 
SavepointSerializer)
+ * 
  */
-public interface SavepointStore {
+public class SavepointStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SavepointStore.class);
+
+   /** Magic number for sanity checks against stored savepoints. */
+   private static final int MAGIC_NUMBER = 0x4960672d;
+
+   /** Prefix for savepoint files. */
+   private static final String prefix = "savepoint-";
 
/**
 * Stores the savepoint.
 *
+* @param targetDirectory Target directory to store savepoint in
 * @param savepoint Savepoint to be stored
 * @paramSavepoint type
 * @return Path of stored savepoint
 * @throws Exception Failures during store are forwarded
 */
-String storeSavepoint(T savepoint) throws 
Exception;
+   public static  String storeSavepoint(
+   String targetDirectory,
+   T savepoint) throws IOException {
+
+   checkNotNull(targetDirectory, "Target directory");
+   checkNotNull(savepoint, "Savepoint");
+
+   Exception latestException = null;
+   Path path = null;
+   FSDataOutputStream fdos = null;
+
+   FileSystem fs = null;
+
+   // Try to create a FS output stream
+   for (int attempt = 0; attempt < 10; attempt++) {
+   path = new Path(targetDirectory, 
FileUtils.getRandomFilename(prefix));
+
+   if (fs == null) {
+   fs = FileSystem.get(path.toUri());
+   }
+
+   try {
+   fdos = fs.create(path, false);
+   break;
+   } catch (Exception e) {
+   latestException = e;
+   }
+   }
+
+   if (fdos == null) {
+   throw new IOException("Failed to create file output 
stream at " + path, latestException);
+   }
+
+   boolean success = false;
+   try (DataOutputStream dos = new DataOutputStream(fdos)) {
+   // Write header
+   dos.writeInt(MAGIC_NUMBER);
+   dos.writeInt(savepoint.getVersion());
+
+   // Write savepoint
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(savepoint);
+   serializer.serialize(savepoint, dos);
+   success = true;
+   } finally {
+   if (!success && fs.exists(path)) {
+   if (!fs.delete(path, true)) {
+   LOG.warn("Failed to delete file " + 
path + " after failed write.");
--- End diff --

Replaced here and below


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82977296
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -43,24 +125,62 @@
 * @return The loaded savepoint
 * @throws Exception Failures during load are forwared
 */
-   Savepoint loadSavepoint(String path) throws Exception;
+   public static Savepoint loadSavepoint(String path) throws IOException {
+   Preconditions.checkNotNull(path, "Path");
 
-   /**
-* Disposes the savepoint at the specified path.
-*
-* @param pathPath of savepoint to dispose
-* @throws Exception Failures during diposal are forwarded
-*/
-   void disposeSavepoint(String path) throws Exception;
+   try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path {
+   int magicNumber = dis.readInt();
+
+   if (magicNumber == MAGIC_NUMBER) {
+   int version = dis.readInt();
+
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(version);
+   return serializer.deserialize(dis);
+   } else {
+   throw new RuntimeException("Unexpected magic 
number. This is most likely " +
+   "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
+   "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
+   "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
+   "file is not a proper savepoint 
or the file has been corrupted.");
+   }
+   }
+   }
 
/**
-* Shut downs the savepoint store.
+* Removes the savepoint meta data w/o loading and disposing it.
 *
-* Only necessary for implementations where the savepoint life-cycle 
is
-* bound to the cluster life-cycle.
-*
-* @throws Exception Failures during shut down are forwarded
+* @param path Path of savepoint to remove
+* @throws Exception Failures during disposal are forwarded
 */
-   void shutdown() throws Exception;
+   public static void removeSavepoint(String path) throws Exception {
--- End diff --

Changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 ---
@@ -48,13 +48,12 @@
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map tasks,
-   SavepointStore savepointStore,
String savepointPath) throws Exception {
 
// (1) load the savepoint
-   Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
+   Savepoint savepoint = 
SavepointStore.loadSavepoint(savepointPath);
final Map taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
-   
+
// (2) validate it (parallelism, etc)
for (TaskState taskState : savepoint.getTaskStates()) {
ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
--- End diff --

Yes, updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976634
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -233,68 +229,90 @@ public int getNumberOfRetainedCheckpoints() {
}
 
@Override
-   public void shutdown() throws Exception {
-   LOG.info("Shutting down");
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

Our only option would be to wrap in our own Exception, because Curator is 
throwing the general `Exception`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976383
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -172,7 +168,7 @@ public void recover() throws Exception {
 
for (int i = 0; i < numberOfInitialCheckpoints - 1; 
i++) {
try {
-   
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+   
removeSubsumed(initialCheckpoints.get(i));
--- End diff --

Yes. Even more, I think this is generally dangerous. What if a checkpoint 
is recovered, but the checkpoint cannot be restored, than we will have lost all 
others. Since we currently only keep a single one anyways, it is not a problem 
yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82972407
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 ---
@@ -64,9 +62,9 @@ public void recover() throws Exception {
 
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-   checkpoints.addLast(checkpoint);
+   checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-   checkpoints.removeFirst().discardState();
+   checkpoints.remove().subsume();
--- End diff --

Manually triggered savepoints for example are not discarded when they are 
subsumed. The CheckpointProperties constructor is package private (for testing) 
and only the static creator methods (for persistent checkpoints, regular 
checkpoints, and manually triggered savepoints) are publicly accessible. Let me 
add a check to the properties that only allow manual discard if the checkpoint 
is persisted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82971700
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,44 +18,243 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.io.Serializable;
+
 /**
  * The configuration of a checkpoint, such as whether
  * 
- * The checkpoint is a savepoint
+ * The checkpoint should be persisted
  * The checkpoint must be full, or may be incremental
  * The checkpoint format must be the common (cross backend) 
format, or may be state-backend specific
--- End diff --

Added a note `(not yet implemented` after those.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82971361
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
//  Handling checkpoints and messages
// 

 
-   public Future triggerSavepoint(long timestamp) throws Exception 
{
-   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+   /**
+* Triggers a savepoint with the default savepoint directory as a 
target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no default savepoint directory has 
been configured
+* @throws Exception Failures during triggering are forwarded
+*/
+   public Future triggerSavepoint(long timestamp) 
throws Exception {
+   return triggerSavepoint(timestamp, null);
+   }
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @param savepointDirectory Target directory for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+* @throws Exception Failures during triggering are 
forwarded
+*/
+   public Future triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
+   String targetDirectory;
+   if (savepointDirectory != null) {
+   targetDirectory = savepointDirectory;
+   } else if (this.savepointDirectory != null) {
+   targetDirectory = this.savepointDirectory;
+   } else {
+   throw new IllegalStateException("No savepoint directory 
configured. " +
+   "You can either specify a directory 
when triggering this savepoint or " +
+   "configure a cluster-wide default via 
key '" +
+   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
+   }
+
+   CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
if (result.isSuccess()) {
-   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-   return savepoint.getCompletionFuture();
-   }
-   else {
-   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+   return 
result.getPendingCheckpoint().getCompletionFuture();
+   } else {
+   CompletableFuture failed = new 
FlinkCompletableFuture<>();
+   failed.completeExceptionally(new Exception("Failed to 
trigger savepoint: " + result.getFailureReason().message()));
--- End diff --

`CheckpointDeclineReason` is not an `Exception`, but an enum of decline 
reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82971228
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
//  Handling checkpoints and messages
// 

 
-   public Future triggerSavepoint(long timestamp) throws Exception 
{
-   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+   /**
+* Triggers a savepoint with the default savepoint directory as a 
target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no default savepoint directory has 
been configured
+* @throws Exception Failures during triggering are forwarded
+*/
+   public Future triggerSavepoint(long timestamp) 
throws Exception {
+   return triggerSavepoint(timestamp, null);
+   }
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @param savepointDirectory Target directory for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+* @throws Exception Failures during triggering are 
forwarded
+*/
+   public Future triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
+   String targetDirectory;
+   if (savepointDirectory != null) {
+   targetDirectory = savepointDirectory;
+   } else if (this.savepointDirectory != null) {
+   targetDirectory = this.savepointDirectory;
+   } else {
+   throw new IllegalStateException("No savepoint directory 
configured. " +
+   "You can either specify a directory 
when triggering this savepoint or " +
+   "configure a cluster-wide default via 
key '" +
+   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
+   }
+
+   CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
if (result.isSuccess()) {
-   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-   return savepoint.getCompletionFuture();
-   }
-   else {
-   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+   return 
result.getPendingCheckpoint().getCompletionFuture();
+   } else {
+   CompletableFuture failed = new 
FlinkCompletableFuture<>();
--- End diff --

I was looking for that one, but didn't find it. Thanks! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82970716
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -219,33 +245,9 @@ public CheckpointCoordinator(
 * Shuts down the checkpoint coordinator.
 *
 * After this method has been called, the coordinator does not accept
-* and further messages and cannot trigger any further checkpoints. All
-* checkpoint state is discarded.
-*/
-   public void shutdown() throws Exception {
-   shutdown(true);
-   }
-
-   /**
-* Suspends the checkpoint coordinator.
-*
-* After this method has been called, the coordinator does not accept
 * and further messages and cannot trigger any further checkpoints.
-*
-* The difference to shutdown is that checkpoint state in the store
-* and counter is kept around if possible to recover later.
-*/
-   public void suspend() throws Exception {
-   shutdown(false);
-   }
-
-   /**
-* Shuts down the checkpoint coordinator.
-*
-* @param shutdownStoreAndCounter Depending on this flag the checkpoint
-* state services are shut down or suspended.
 */
-   private void shutdown(boolean shutdownStoreAndCounter) throws Exception 
{
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

True, but I thought we kept it `shutdown` for consistency reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82969876
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -637,20 +637,25 @@ protected int savepoint(String[] args) {
"Specify a Job 
ID to trigger a savepoint."));
}
 
-   return triggerSavepoint(options, jobId);
+   String savepointDirectory = null;
+   if (cleanedArgs.length == 2) {
--- End diff --

Changed the check to `>= 2` and printed a message that some arguments are 
unneded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82786136
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -233,68 +229,90 @@ public int getNumberOfRetainedCheckpoints() {
}
 
@Override
-   public void shutdown() throws Exception {
-   LOG.info("Shutting down");
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

Maybe we can narrow the `Exception` a little bit down here or wrap the 
`InterruptedException` and `KeeperException` in our own exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82799818
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 ---
@@ -467,8 +467,11 @@ object JobManagerMessages {
 * of triggering and acknowledging checkpoints.
 *
 * @param jobId The JobID of the job to trigger the savepoint for.
+* @param savepointDirectory Optional target directory
 */
-  case class TriggerSavepoint(jobId: JobID) extends RequiresLeaderSessionID
+  case class TriggerSavepoint(
+  jobId: JobID,
+  savepointDirectory : String = null) extends RequiresLeaderSessionID
--- End diff --

Scalaesque would be to use `Option`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82797030
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -43,24 +125,62 @@
 * @return The loaded savepoint
 * @throws Exception Failures during load are forwared
 */
-   Savepoint loadSavepoint(String path) throws Exception;
+   public static Savepoint loadSavepoint(String path) throws IOException {
+   Preconditions.checkNotNull(path, "Path");
 
-   /**
-* Disposes the savepoint at the specified path.
-*
-* @param pathPath of savepoint to dispose
-* @throws Exception Failures during diposal are forwarded
-*/
-   void disposeSavepoint(String path) throws Exception;
+   try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path {
+   int magicNumber = dis.readInt();
+
+   if (magicNumber == MAGIC_NUMBER) {
+   int version = dis.readInt();
+
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(version);
+   return serializer.deserialize(dis);
+   } else {
+   throw new RuntimeException("Unexpected magic 
number. This is most likely " +
+   "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
+   "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
+   "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
+   "file is not a proper savepoint 
or the file has been corrupted.");
+   }
+   }
+   }
 
/**
-* Shut downs the savepoint store.
+* Removes the savepoint meta data w/o loading and disposing it.
 *
-* Only necessary for implementations where the savepoint life-cycle 
is
-* bound to the cluster life-cycle.
-*
-* @throws Exception Failures during shut down are forwarded
+* @param path Path of savepoint to remove
+* @throws Exception Failures during disposal are forwarded
 */
-   void shutdown() throws Exception;
+   public static void removeSavepoint(String path) throws Exception {
--- End diff --

Could be `IOException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82776861
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -637,20 +637,25 @@ protected int savepoint(String[] args) {
"Specify a Job 
ID to trigger a savepoint."));
}
 
-   return triggerSavepoint(options, jobId);
+   String savepointDirectory = null;
+   if (cleanedArgs.length == 2) {
--- End diff --

Not sure, but what happens if the user types `bin/flink savepoint  
savepointDirectory foobar`. This will ignore the savepointDirectory, right? 
Maybe we could print a corresponding message if something like that happens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82784524
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 ---
@@ -64,9 +62,9 @@ public void recover() throws Exception {
 
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-   checkpoints.addLast(checkpoint);
+   checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-   checkpoints.removeFirst().discardState();
+   checkpoints.remove().subsume();
--- End diff --

What happens if the `CheckpointProperties.discardOnSubsumed() == false`. 
This could lead to files in your nfs which are not cleaned up even though they 
are not useful for anything else (given that it's not a persistent checkpoint), 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82783495
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
@@ -256,29 +306,51 @@ public boolean acknowledgeTask(
 * Aborts a checkpoint because it expired (took too long).
 */
public void abortExpired() throws Exception {
--- End diff --

Would it make sense to narrow the `Exception` a little bit down? Maybe 
introducing a `CheckpointException` or `PendingCheckpointException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82778433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
//  Handling checkpoints and messages
// 

 
-   public Future triggerSavepoint(long timestamp) throws Exception 
{
-   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+   /**
+* Triggers a savepoint with the default savepoint directory as a 
target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no default savepoint directory has 
been configured
+* @throws Exception Failures during triggering are forwarded
+*/
+   public Future triggerSavepoint(long timestamp) 
throws Exception {
+   return triggerSavepoint(timestamp, null);
+   }
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @param savepointDirectory Target directory for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+* @throws Exception Failures during triggering are 
forwarded
+*/
+   public Future triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
+   String targetDirectory;
+   if (savepointDirectory != null) {
+   targetDirectory = savepointDirectory;
+   } else if (this.savepointDirectory != null) {
+   targetDirectory = this.savepointDirectory;
+   } else {
+   throw new IllegalStateException("No savepoint directory 
configured. " +
+   "You can either specify a directory 
when triggering this savepoint or " +
+   "configure a cluster-wide default via 
key '" +
+   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
+   }
+
+   CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
if (result.isSuccess()) {
-   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-   return savepoint.getCompletionFuture();
-   }
-   else {
-   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+   return 
result.getPendingCheckpoint().getCompletionFuture();
+   } else {
+   CompletableFuture failed = new 
FlinkCompletableFuture<>();
--- End diff --

In order to create a completed future you can write 
`FlinkCompletableFuture.completedExceptionally(Throwable t)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82781804
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -51,9 +57,11 @@
/** States of the different task groups belonging to this checkpoint */
private final Map taskStates;
 
-   /** Flag to indicate whether the completed checkpoint data should be 
deleted when this
-* handle to the checkpoint is disposed */
-   private final boolean deleteStateWhenDisposed;
+   /** Properties for this checkpoint. */
+   private final CheckpointProperties props;
+
+   /** External path if persisted checkpoint; null otherwise. 
*/
+   private final String externalPath;
--- End diff --

Could the external path be part of the `CheckpointProperties`? The external 
path could be an `Option`. Then we would get around the null field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82778557
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
//  Handling checkpoints and messages
// 

 
-   public Future triggerSavepoint(long timestamp) throws Exception 
{
-   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+   /**
+* Triggers a savepoint with the default savepoint directory as a 
target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no default savepoint directory has 
been configured
+* @throws Exception Failures during triggering are forwarded
+*/
+   public Future triggerSavepoint(long timestamp) 
throws Exception {
+   return triggerSavepoint(timestamp, null);
+   }
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param timestamp The timestamp for the savepoint.
+* @param savepointDirectory Target directory for the savepoint.
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+* @throws Exception Failures during triggering are 
forwarded
+*/
+   public Future triggerSavepoint(long timestamp, 
String savepointDirectory) throws Exception {
+   String targetDirectory;
+   if (savepointDirectory != null) {
+   targetDirectory = savepointDirectory;
+   } else if (this.savepointDirectory != null) {
+   targetDirectory = this.savepointDirectory;
+   } else {
+   throw new IllegalStateException("No savepoint directory 
configured. " +
+   "You can either specify a directory 
when triggering this savepoint or " +
+   "configure a cluster-wide default via 
key '" +
+   ConfigConstants.SAVEPOINT_DIRECTORY_KEY 
+ "'.");
+   }
+
+   CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+   CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
if (result.isSuccess()) {
-   PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-   return savepoint.getCompletionFuture();
-   }
-   else {
-   return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+   return 
result.getPendingCheckpoint().getCompletionFuture();
+   } else {
+   CompletableFuture failed = new 
FlinkCompletableFuture<>();
+   failed.completeExceptionally(new Exception("Failed to 
trigger savepoint: " + result.getFailureReason().message()));
--- End diff --

Not adding the complete stack trace is on purpose, right? I'm wondering 
whether this could not help to debug problems later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82796870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -43,24 +125,62 @@
 * @return The loaded savepoint
 * @throws Exception Failures during load are forwared
 */
-   Savepoint loadSavepoint(String path) throws Exception;
+   public static Savepoint loadSavepoint(String path) throws IOException {
+   Preconditions.checkNotNull(path, "Path");
 
-   /**
-* Disposes the savepoint at the specified path.
-*
-* @param pathPath of savepoint to dispose
-* @throws Exception Failures during diposal are forwarded
-*/
-   void disposeSavepoint(String path) throws Exception;
+   try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path {
+   int magicNumber = dis.readInt();
+
+   if (magicNumber == MAGIC_NUMBER) {
+   int version = dis.readInt();
+
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(version);
+   return serializer.deserialize(dis);
+   } else {
+   throw new RuntimeException("Unexpected magic 
number. This is most likely " +
+   "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
+   "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
+   "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
+   "file is not a proper savepoint 
or the file has been corrupted.");
+   }
+   }
+   }
 
/**
-* Shut downs the savepoint store.
+* Removes the savepoint meta data w/o loading and disposing it.
 *
-* Only necessary for implementations where the savepoint life-cycle 
is
-* bound to the cluster life-cycle.
-*
-* @throws Exception Failures during shut down are forwarded
+* @param path Path of savepoint to remove
+* @throws Exception Failures during disposal are forwarded
 */
-   void shutdown() throws Exception;
+   public static void removeSavepoint(String path) throws Exception {
+   Preconditions.checkNotNull(path, "Path");
+
+   try {
+   LOG.info("Removing savepoint: " + path);
--- End diff --

{}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82781688
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,44 +18,243 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.io.Serializable;
+
 /**
  * The configuration of a checkpoint, such as whether
  * 
- * The checkpoint is a savepoint
+ * The checkpoint should be persisted
  * The checkpoint must be full, or may be incremental
  * The checkpoint format must be the common (cross backend) 
format, or may be state-backend specific
--- End diff --

The second and third are not yet implemented, aren't they?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82777602
  
--- Diff: flink-runtime-web/web-dashboard/app/scripts/index.coffee ---
@@ -29,8 +29,8 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
 # --
 
 .value 'flinkConfig', {
-  jobServer: ''
-# jobServer: 'http://localhost:8081/'
+#  jobServer: ''
+  jobServer: 'http://localhost:8081/'
--- End diff --

What's the reason for this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82796670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 ---
@@ -18,23 +18,105 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Savepoint store used to persist {@link Savepoint} instances.
+ * A file system based savepoint store.
  *
- * The main implementation is the {@link FsSavepointStore}. We also 
have the
- * {@link HeapSavepointStore} for historical reasons (introduced in Flink 
1.0).
+ * Stored savepoints have the following format:
+ * 
+ * MagicNumber SavepointVersion Savepoint
+ *   - MagicNumber => int
+ *   - SavepointVersion => int (returned by Savepoint#getVersion())
+ *   - Savepoint => bytes (serialized via version-specific 
SavepointSerializer)
+ * 
  */
-public interface SavepointStore {
+public class SavepointStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SavepointStore.class);
+
+   /** Magic number for sanity checks against stored savepoints. */
+   private static final int MAGIC_NUMBER = 0x4960672d;
+
+   /** Prefix for savepoint files. */
+   private static final String prefix = "savepoint-";
 
/**
 * Stores the savepoint.
 *
+* @param targetDirectory Target directory to store savepoint in
 * @param savepoint Savepoint to be stored
 * @paramSavepoint type
 * @return Path of stored savepoint
 * @throws Exception Failures during store are forwarded
 */
-String storeSavepoint(T savepoint) throws 
Exception;
+   public static  String storeSavepoint(
+   String targetDirectory,
+   T savepoint) throws IOException {
+
+   checkNotNull(targetDirectory, "Target directory");
+   checkNotNull(savepoint, "Savepoint");
+
+   Exception latestException = null;
+   Path path = null;
+   FSDataOutputStream fdos = null;
+
+   FileSystem fs = null;
+
+   // Try to create a FS output stream
+   for (int attempt = 0; attempt < 10; attempt++) {
+   path = new Path(targetDirectory, 
FileUtils.getRandomFilename(prefix));
+
+   if (fs == null) {
+   fs = FileSystem.get(path.toUri());
+   }
+
+   try {
+   fdos = fs.create(path, false);
+   break;
+   } catch (Exception e) {
+   latestException = e;
+   }
+   }
+
+   if (fdos == null) {
+   throw new IOException("Failed to create file output 
stream at " + path, latestException);
+   }
+
+   boolean success = false;
+   try (DataOutputStream dos = new DataOutputStream(fdos)) {
+   // Write header
+   dos.writeInt(MAGIC_NUMBER);
+   dos.writeInt(savepoint.getVersion());
+
+   // Write savepoint
+   SavepointSerializer serializer = 
SavepointSerializers.getSerializer(savepoint);
+   serializer.serialize(savepoint, dos);
+   success = true;
+   } finally {
+   if (!success && fs.exists(path)) {
+   if (!fs.delete(path, true)) {
+   LOG.warn("Failed to delete file " + 
path + " after failed write.");
--- End diff --

Placeholder {}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82778103
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -219,33 +245,9 @@ public CheckpointCoordinator(
 * Shuts down the checkpoint coordinator.
 *
 * After this method has been called, the coordinator does not accept
-* and further messages and cannot trigger any further checkpoints. All
-* checkpoint state is discarded.
-*/
-   public void shutdown() throws Exception {
-   shutdown(true);
-   }
-
-   /**
-* Suspends the checkpoint coordinator.
-*
-* After this method has been called, the coordinator does not accept
 * and further messages and cannot trigger any further checkpoints.
-*
-* The difference to shutdown is that checkpoint state in the store
-* and counter is kept around if possible to recover later.
-*/
-   public void suspend() throws Exception {
-   shutdown(false);
-   }
-
-   /**
-* Shuts down the checkpoint coordinator.
-*
-* @param shutdownStoreAndCounter Depending on this flag the checkpoint
-* state services are shut down or suspended.
 */
-   private void shutdown(boolean shutdownStoreAndCounter) throws Exception 
{
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

Didn't you tell me that the verb is `shutDown`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82787864
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 ---
@@ -48,13 +48,12 @@
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map tasks,
-   SavepointStore savepointStore,
String savepointPath) throws Exception {
 
// (1) load the savepoint
-   Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
+   Savepoint savepoint = 
SavepointStore.loadSavepoint(savepointPath);
final Map taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
-   
+
// (2) validate it (parallelism, etc)
for (TaskState taskState : savepoint.getTaskStates()) {
ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
--- End diff --

Just a question for line 74. Shouldn't this be 
`taskState.getMaxParallelism()` and `executionJobVertex.getParallelism()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82782971
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 ---
@@ -51,9 +57,11 @@
/** States of the different task groups belonging to this checkpoint */
private final Map taskStates;
 
-   /** Flag to indicate whether the completed checkpoint data should be 
deleted when this
-* handle to the checkpoint is disposed */
-   private final boolean deleteStateWhenDisposed;
+   /** Properties for this checkpoint. */
+   private final CheckpointProperties props;
+
+   /** External path if persisted checkpoint; null otherwise. 
*/
+   private final String externalPath;
--- End diff --

Ok I see, the path is not fully known when creating the 
`CheckpointProperties`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82784661
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 ---
@@ -64,9 +62,9 @@ public void recover() throws Exception {
 
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-   checkpoints.addLast(checkpoint);
+   checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-   checkpoints.removeFirst().discardState();
+   checkpoints.remove().subsume();
--- End diff --

Maybe we shouldn't allow all different combinations of 
`CheckpointProperties`. Only those which make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-07 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2608

[FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints

## Introduction

This is the first part of 
[FLIP-10](https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints),
 allowing users to persist periodic checkpoints.

Persistent checkpoints behave very much like regular periodic checkpoints 
except the following differences:

1. They persist their meta data (like savepoints).
2. They are not discarded when the owning job fails permanently. 
Furthermore, they can be configured to not be discarded when the job is 
cancelled.

This means that if a job fails permanently the user will have a checkpoint 
available to restore from. As an example think of the following scenario: a job 
runs smoothly until it hits a bad record that it cannot handle. The current 
behaviour will be that the job will try to recover, but it will hit the bad 
record again and keep on failing. With persistent checkpoints, the user can 
update the program to handle bad records and restore from the most recent 
persistent checkpoints.

## CheckpointConfig

This adds the following `@PublicEvolving` methods to `CheckpointConfig`:

```
enablePersistentCheckpoints(String targetDirectory);
enablePersistentCheckpoints(String targetDirectory, 
PersistentCheckpointCleanup cleanup)
```

The `PersistentCheckpointCleanup` defines how persistent checkpoints are 
cleaned up when the owning job is cancelled. Since currently most streaming 
jobs are stopped via cancellation, the default is to clean persistent 
checkpoints up. The user can overwrite this behaviour via the enum.

## REST API

The REST API exposes the external-path of the most recent persistent 
checkpoint via the REST API. This is also displayed in the web UI for the most 
recent persistent checkpoint.

![screen shot 2016-10-07 at 17 50 
44](https://cloud.githubusercontent.com/assets/1756620/19196699/d0d5065a-8cb6-11e6-8b13-c6bacc4ebe19.png)

## Deprecate savepoint state backends (FLINK-4507)

Furthermore, the savepoint state backends have been removed and all 
savepoints now go to files. The corresponding configuration keys have been 
removed or deprecated:

`savepoints.state.backend.fs.dir` has been deprecated in favour of 
`state.savepoints.dir`. `savepoints.state.backend` has been removed.

## Allow to specify custom savepoint directory (FLINK-4509)

The target directory for savepoints was configured per Flink configuration. 
With this change, this can be overwritten:

```
bin/flink savepoint  [targetDirectory]
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4512-persistent_checkpoints

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2608.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2608


commit 004ba0b38ac2b75148910660242808c13746c444
Author: Ufuk Celebi 
Date:   2016-10-06T14:43:42Z

[FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints

[FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint
[FLINK-4507] [FLIP-10] Deprecate savepoint backend config




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---