[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586228#comment-16586228 ] ASF GitHub Bot commented on FLINK-9325: --- sihuazhou commented on issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful URL: https://github.com/apache/flink/pull/5982#issuecomment-414390311 Hi @StephanEwen thanks for your comments! I was caught up by some terrible work recently, will have look at the `RecoverableFsDataOutputStream` and get back here probably this weekend. ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585251#comment-16585251 ] ASF GitHub Bot commented on FLINK-9325: --- StephanEwen commented on issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkpoint only when the writing is truly successful URL: https://github.com/apache/flink/pull/5982#issuecomment-414149487 Apologies for the delay. The interface of the `AtomicCreatingFsDataOutputStream` is good. As part of the new `StreamingFileSink` design, we created a [recoverable stream](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java), which is to some extend an extension of the `AtomicCreatingFsDataOutputStream` in the following way: - The recoverable stream also does not show the file immediately, but it needs to go through a committer. - The committer can be persisted and recovered - An intermediate status (some data having been written to the file) can also be persisted and recovered This is used in the StreamingFileSink to write data to a file, chunk by chunk, and make sure the state of an output file (as of a checkpoint) can be recovered. Now, can we use the same implementation for both AtomicCreatingFsDataOutputStream and RecoverableFsDataOutputStream? Parts of the logic are very simple (like using a temp file and renaming for HDFS, or using a multipart upload and committing later on S3). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16543986#comment-16543986 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen Thanks! Looking forward~ > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542828#comment-16542828 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 @sihuazhou I got caught up in some other tasks - will try to get back to this here soon, I would like to have this feature in as a base for "search for completed checkpoint". > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517821#comment-16517821 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Could anybody have a look at this? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497900#comment-16497900 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5982#discussion_r192374992 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/ClosingAtomicCreatingFSDataOutputStream.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link AtomicCreatingFsDataOutputStream} that is used to + * implement a safety net against unclosed streams. + * + * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized. + */ +@Internal +public class ClosingAtomicCreatingFSDataOutputStream + extends AtomicCreatingFsDataOutputStream + implements WrappingProxyCloseable { + + private final SafetyNetCloseableRegistry registry; + private final String debugString; + private AtomicCreatingFsDataOutputStream outputStream; + + private volatile boolean closed; + + private ClosingAtomicCreatingFSDataOutputStream( + AtomicCreatingFsDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException { + this.outputStream = delegate; + this.registry = Preconditions.checkNotNull(registry); + this.debugString = Preconditions.checkNotNull(debugString); + this.closed = false; + } + + public boolean isClosed() { + return closed; + } + + @Override + public long getPos() throws IOException { + return outputStream.getPos(); + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + } + + @Override + public void flush() throws IOException { + outputStream.flush(); + } + + @Override + public void sync() throws IOException { + outputStream.sync(); + } + + @Override + public void close() throws IOException { + if (!closed) { --- End diff -- You are right, nice catch! 👍 > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497882#comment-16497882 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5982#discussion_r192371034 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/TwoPhaseFSDataOutputStream.java --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.UUID; + +/** + * Operates the output stream in two phrases, any exception during the operation of {@link TwoPhaseFSDataOutputStream} will + * lead the {@link #targetFile} to be invisible. + * + * PHASE 1, write the data into the {@link #preparingFile}. + * PHASE 2, close the {@link #preparingFile} and rename it to the {@link #targetFile}. + */ +@Internal +public class TwoPhaseFSDataOutputStream extends AtomicCreatingFsDataOutputStream { + + private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseFSDataOutputStream.class); + + /** +* the target file system. +*/ + private final FileSystem fs; + + /** +* the target file which the preparing file will be renamed to in the {@link #closeAndPublish()}. +*/ + private final Path targetFile; + + /** +* the preparing file to store the on flying data. +*/ + private final Path preparingFile; + + /** +* the output stream of the preparing file. +*/ + private final FSDataOutputStream preparedOutputStream; + + private volatile boolean closed; + + public TwoPhaseFSDataOutputStream(FileSystem fs, Path f, FileSystem.WriteMode writeMode) throws IOException { + + Preconditions.checkArgument(FileSystem.WriteMode.OVERWRITE != writeMode, "WriteMode.OVERWRITE is unsupported yet."); + + this.fs = fs; + this.targetFile = f; + this.preparingFile = generateTemporaryFilename(f); + this.closed = false; + + if (writeMode == FileSystem.WriteMode.NO_OVERWRITE && fs.exists(targetFile)) { + throw new IOException("Target file " + targetFile + " is already exists."); + } + + this.preparedOutputStream = fs.create(this.preparingFile, writeMode); + } + + @Override + public long getPos() throws IOException { + return this.preparedOutputStream.getPos(); + } + + @Override + public void write(int b) throws IOException { + this.preparedOutputStream.write(b); + } + + @Override + public void flush() throws IOException { + this.preparedOutputStream.flush(); + } + + @Override + public void sync() throws IOException { + this.preparedOutputStream.sync(); + } + + /** +* Does the cleanup things, close the stream and delete the {@link #preparingFile}. +*/ + @Override + public void close() throws IOException { + if (!closed) { --- End diff -- Same here. > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assigne
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497879#comment-16497879 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5982#discussion_r192370481 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/ClosingAtomicCreatingFSDataOutputStream.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link AtomicCreatingFsDataOutputStream} that is used to + * implement a safety net against unclosed streams. + * + * See {@link SafetyNetCloseableRegistry} for more details on how this is utilized. + */ +@Internal +public class ClosingAtomicCreatingFSDataOutputStream + extends AtomicCreatingFsDataOutputStream + implements WrappingProxyCloseable { + + private final SafetyNetCloseableRegistry registry; + private final String debugString; + private AtomicCreatingFsDataOutputStream outputStream; + + private volatile boolean closed; + + private ClosingAtomicCreatingFSDataOutputStream( + AtomicCreatingFsDataOutputStream delegate, SafetyNetCloseableRegistry registry, String debugString) throws IOException { + this.outputStream = delegate; + this.registry = Preconditions.checkNotNull(registry); + this.debugString = Preconditions.checkNotNull(debugString); + this.closed = false; + } + + public boolean isClosed() { + return closed; + } + + @Override + public long getPos() throws IOException { + return outputStream.getPos(); + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + } + + @Override + public void flush() throws IOException { + outputStream.flush(); + } + + @Override + public void sync() throws IOException { + outputStream.sync(); + } + + @Override + public void close() throws IOException { + if (!closed) { --- End diff -- Either `closed` is not required to be `volatile` or this is not enough to prevent a race condition from happening. You could either use an `AtomicBoolean::compareAndSet` or in this particular case rely on the serialization from the registry, i.e. `if (registry.unregisterCloseable(this)) {...}` > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494674#comment-16494674 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen I've addressed your comments, could you please have a look again? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493057#comment-16493057 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Yes, @StephanEwen thanks for the continuous suggestions, will follow your suggestion. > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492828#comment-16492828 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 I think we need to have a special output stream type (`AtomicCreatingFsDataOutputStream` or similar) as the return type of `FileSystem.createAtomic()`. Otherwise, how can a user actually create a file? The `closeAndPublish()` method is not part of any API class. > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476945#comment-16476945 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen I guess this PR is already for an another look now... > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476073#comment-16476073 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 My gut feeling is that we don't need `WriteMode.OVERWRITE` in cases where one wants such an atomic file creation... > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476033#comment-16476033 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen Thanks for your good suggestions! Will update PR like it, and what about the problem related to WriteMode.OVERWRITE, do you against if we don't support it in `createAtomically()`? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475919#comment-16475919 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 Good point about the renaming on `close()` in case close is called for cleanup, rather than success. We could follow the same semantics as in [CheckpointStateOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java#L61) There the semantics are: - `close()` means "close on error / cleanup" and closes the stream and deletes the temp file. - `closeAndPublish()` would mean "close on success" and close the stream and rename the file. - After ``closeAndPublish()` has been called, `close()` becomes a no-op. The [FsCheckpointMetadataOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java) implements that pattern, I think it worked well and is easy to use. > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475438#comment-16475438 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Hi, I met a problem here, for the Hadoop FileSystem when the `schema='hdfs'`, we can't rename a file to a existing file. This means that it hard(I'm not sure whether it's possible) to support the `WriteModel.OVERWRITE` for `createAtomically()` with `schema='hdfs'`, if we work-around this by firstly rename the existing file to a tmp file and then rename the pre-commit file to the target file, then this two step operations is not atomic, which may lead to a inconsistent result if the JVM crashes between this operations. I'm running out of myself on this now...Do you have any suggestions for the `createAtomically()` when `schema='hdfs'` and `writeMode='WriteMode.OVERWRITE'`? @StephanEwen > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475318#comment-16475318 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Hi @StephanEwen, I added more tests for the `FileSystem#createAtomically()`, concerning the `TwoPhaseFsDataoutputStream`, can we introduce a `commit_on_close` option for it to make it easier to use? If the `commit_on_close=true`, we commit the writing in `close()` atomically. If the `commit_on_close=false`, we need user to call `commit()` manually, this could be useful in try-with-resource situation, what do you think? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473941#comment-16473941 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen After thinking your comments again, I think I misunderstood the `Also, we need this method to be implemented in all FileSystem subclasses.` 😭 , I will address that. But the question related to `TwoPhaseFsDataoutputStream` still a bit confuse to me... > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473870#comment-16473870 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Hi @StephanEwen Thank you very much for your reply, I'm not sure whether just overriding `close()` to do `super.close()` + `rename()` is enough, for example. ``` try (outputStream = new TwoPhraseFSDatautputStream(...)) { outputStream.write("part1"); throw new RuntimeException("xxx"); outputStream.write("part2"); } ``` This will also rename the `tmp file` to the `target file`, because we just `rename()` in `close()`. And the current `TwoPhraseFSDatautputStream` works as a wrapper so it should have supported all the file system. I'm not sure whether I misunderstand what your meaning... please let knows if I misunderstand something and your opinion, Thanks! > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473859#comment-16473859 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 Thanks for preparing this. I looked at the `TwoPhraseFSDatautputStream` - maybe we can make this simpler. Do we need the distinction between phases? Is it not enough to behave as a regular stream, just overriding `close()` to do `super.close()` + `rename()`? That may be enough. When the stream is closed, all the writing methods anyways fail with a "stream closed exception". Also, we need this method to be implemented in all FileSystem subclasses. Typos: "Phrase" --> "Phase" > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473487#comment-16473487 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Hi @StephanEwen I have update the PR according to the above comments, it's ready for an another review. > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472330#comment-16472330 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Hi @StephanEwen, - I prefer to introduce the `create(Path, WriteMode)`, because I feel this is more extensible (it could allow us to introduce other WriteMode's in the future). - I would choose to throw unsupported operations exception for the others schema, I think that make the program more deterministic. What do you think? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472191#comment-16472191 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 How about adding the method `createAtomically` or so, with otherwise the same signature as the `create(Path, WriteMode)` method? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471920#comment-16471920 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 @StephanEwen Thank you very much for your great suggestion! I will address this that way. > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471903#comment-16471903 ] ASF GitHub Bot commented on FLINK-9325: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 I think this fix here might not work for S3, because a rename() with the S3 file systems will actually trigger a copy (or even a download and upload), so it is not a cheap operation. The we can fix this by adding a `create(...)` method (or mode) to the FileSystem API that does not publish data in the file until `close()` is called. For hdfs://, file://, this would be using a temp file with renaming, for S3 we don't write to a temp file, because S3 makes the file only visible on close() anyways. > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful
[ https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470221#comment-16470221 ] ASF GitHub Bot commented on FLINK-9325: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5982 Hi @StephanEwen Could you please have a look at this? > generate the _meta file for checkpoint only when the writing is truly > successful > > > Key: FLINK-9325 > URL: https://issues.apache.org/jira/browse/FLINK-9325 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > > We should generate the _meta file for checkpoint only when the writing is > totally successful. We should write the metadata file first to a temp file > and then atomically rename it (with an equivalent workaround for S3). -- This message was sent by Atlassian JIRA (v7.6.3#76005)