[
https://issues.apache.org/jira/browse/FLINK-5897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885661#comment-15885661
]
ASF GitHub Bot commented on FLINK-5897:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/3411#discussion_r103192397
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
---
@@ -203,48 +208,67 @@ void setStatsCallback(@Nullable
PendingCheckpointStats trackerCallback) {
return onCompletionPromise;
}
- public CompletedCheckpoint finalizeCheckpoint() {
+ public CompletedCheckpoint finalizeCheckpointExternalized() throws
IOException {
+
synchronized (lock) {
- Preconditions.checkState(isFullyAcknowledged(),
"Pending checkpoint has not been fully acknowledged yet.");
-
- // Persist if required
- String externalPath = null;
- if (props.externalizeCheckpoint()) {
- try {
- Savepoint savepoint = new
SavepointV1(checkpointId, taskStates.values());
- externalPath =
SavepointStore.storeSavepoint(
- targetDirectory,
- savepoint
- );
- } catch (IOException e) {
- LOG.error("Failed to persist checkpoint
{}.",checkpointId, e);
- }
- }
+ checkState(isFullyAcknowledged(), "Pending checkpoint
has not been fully acknowledged yet.");
- CompletedCheckpoint completed = new CompletedCheckpoint(
- jobId,
- checkpointId,
- checkpointTimestamp,
- System.currentTimeMillis(),
- new HashMap<>(taskStates),
- props,
- externalPath);
+ // externalize the metadata
+ final Savepoint savepoint = new
SavepointV1(checkpointId, taskStates.values());
- onCompletionPromise.complete(completed);
+ // TEMP FIX - The savepoint store is strictly typed to
file systems currently
+ // but the checkpoints think more generic.
we need to work with file handles
+ // here until the savepoint serializer
accepts a generic stream factory
- if (statsCallback != null) {
- // Finalize the statsCallback and give the
completed checkpoint a
- // callback for discards.
- CompletedCheckpointStats.DiscardCallback
discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
- completed.setDiscardCallback(discardCallback);
- }
+ final FileStateHandle metadataHandle =
SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
+ final String externalPointer =
metadataHandle.getFilePath().getParent().toString();
- dispose(false);
+ return finalizeInternal(metadataHandle,
externalPointer);
+ }
+ }
+
+ public CompletedCheckpoint finalizeCheckpointNonExternalized() {
+ synchronized (lock) {
+ checkState(isFullyAcknowledged(), "Pending checkpoint
has not been fully acknowledged yet.");
- return completed;
+ // finalize without external metadata
+ return finalizeInternal(null, null);
}
}
+ @GuardedBy("lock")
+ private CompletedCheckpoint finalizeInternal(
+ @Nullable StreamStateHandle externalMetadata,
+ @Nullable String externalPointer) {
+
+ assert(Thread.holdsLock(lock));
+
+ CompletedCheckpoint completed = new CompletedCheckpoint(
+ jobId,
+ checkpointId,
+ checkpointTimestamp,
+ System.currentTimeMillis(),
+ new HashMap<>(taskStates),
+ props,
+ externalMetadata,
+ externalPointer);
+
+ onCompletionPromise.complete(completed);
--- End diff --
If the creation `CompletedCheckpoint` fails (for example because it the
external metadata is null although the properties say the checkpoint should
have been externalized), the promise is never completed. I think we should do a
try catch and fail the promise in that case.
> Untie Checkpoint Externalization from FileSystems
> -------------------------------------------------
>
> Key: FLINK-5897
> URL: https://issues.apache.org/jira/browse/FLINK-5897
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Affects Versions: 1.2.0
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Currently, externalizing checkpoint metadata and storing savepoints depends
> strictly on FileSystems.
> Since state backends are more general, storing and cleaning up checkpoints
> with state backend hooks requires to untie savepoints and externalized
> checkpoints from filesystems.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)