[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586228#comment-16586228
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

sihuazhou commented on issue #5982: [FLINK-9325][checkpoint]generate the meta 
file for checkpoint only when the writing is truly successful
URL: https://github.com/apache/flink/pull/5982#issuecomment-414390311
 
 
   Hi @StephanEwen thanks for your comments! I was caught up by some terrible 
work recently, will have look at the `RecoverableFsDataOutputStream` and get 
back here probably this weekend. ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-08-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585251#comment-16585251
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

StephanEwen commented on issue #5982: [FLINK-9325][checkpoint]generate the meta 
file for checkpoint only when the writing is truly successful
URL: https://github.com/apache/flink/pull/5982#issuecomment-414149487
 
 
   Apologies for the delay.
   
   The interface of the `AtomicCreatingFsDataOutputStream` is good.
   As part of the new `StreamingFileSink` design, we created a [recoverable 
stream](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java),
 which is to some extend an extension of the `AtomicCreatingFsDataOutputStream` 
in the following way:
   
 - The recoverable stream also does not show the file immediately, but it 
needs to go through a committer.
 - The committer can be persisted and recovered
 - An intermediate status (some data having been written to the file) can 
also be persisted and recovered
   
   This is used in the StreamingFileSink to write data to a file, chunk by 
chunk, and make sure the state of an output file (as of a checkpoint) can be 
recovered.
   
   Now, can we use the same implementation for both 
AtomicCreatingFsDataOutputStream and RecoverableFsDataOutputStream? Parts of 
the logic are very simple (like using a temp file and renaming for HDFS, or 
using a multipart upload and committing later on S3).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16543986#comment-16543986
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen Thanks! Looking forward~


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542828#comment-16542828
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
@sihuazhou I got caught up in some other tasks - will try to get back to 
this here soon, I would like to have this feature in as a base for "search for 
completed checkpoint".


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: pull-request-available
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517821#comment-16517821
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Could anybody have a look at this?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497900#comment-16497900
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5982#discussion_r192374992
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/ClosingAtomicCreatingFSDataOutputStream.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link 
AtomicCreatingFsDataOutputStream} that is used to
+ * implement a safety net against unclosed streams.
+ *
+ * See {@link SafetyNetCloseableRegistry} for more details on how this 
is utilized.
+ */
+@Internal
+public class ClosingAtomicCreatingFSDataOutputStream
+   extends AtomicCreatingFsDataOutputStream
+   implements 
WrappingProxyCloseable {
+
+   private final SafetyNetCloseableRegistry registry;
+   private final String debugString;
+   private AtomicCreatingFsDataOutputStream outputStream;
+
+   private volatile boolean closed;
+
+   private ClosingAtomicCreatingFSDataOutputStream(
+   AtomicCreatingFsDataOutputStream delegate, 
SafetyNetCloseableRegistry registry, String debugString) throws IOException {
+   this.outputStream = delegate;
+   this.registry = Preconditions.checkNotNull(registry);
+   this.debugString = Preconditions.checkNotNull(debugString);
+   this.closed = false;
+   }
+
+   public boolean isClosed() {
+   return closed;
+   }
+
+   @Override
+   public long getPos() throws IOException {
+   return outputStream.getPos();
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+   outputStream.write(b);
+   }
+
+   @Override
+   public void flush() throws IOException {
+   outputStream.flush();
+   }
+
+   @Override
+   public void sync() throws IOException {
+   outputStream.sync();
+   }
+
+   @Override
+   public void close() throws IOException {
+   if (!closed) {
--- End diff --

You are right, nice catch! 👍


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497882#comment-16497882
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5982#discussion_r192371034
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/TwoPhaseFSDataOutputStream.java
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Operates the output stream in two phrases, any exception during the 
operation of {@link TwoPhaseFSDataOutputStream} will
+ * lead the {@link #targetFile} to be invisible.
+ *
+ * PHASE 1, write the data into the {@link #preparingFile}.
+ * PHASE 2, close the {@link #preparingFile} and rename it to the {@link 
#targetFile}.
+ */
+@Internal
+public class TwoPhaseFSDataOutputStream extends 
AtomicCreatingFsDataOutputStream {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseFSDataOutputStream.class);
+
+   /**
+* the target file system.
+*/
+   private final FileSystem fs;
+
+   /**
+* the target file which the preparing file will be renamed to in the 
{@link #closeAndPublish()}.
+*/
+   private final Path targetFile;
+
+   /**
+* the preparing file to store the on flying data.
+*/
+   private final Path preparingFile;
+
+   /**
+* the output stream of the preparing file.
+*/
+   private final FSDataOutputStream preparedOutputStream;
+
+   private volatile boolean closed;
+
+   public TwoPhaseFSDataOutputStream(FileSystem fs, Path f, 
FileSystem.WriteMode writeMode) throws IOException {
+
+   Preconditions.checkArgument(FileSystem.WriteMode.OVERWRITE != 
writeMode, "WriteMode.OVERWRITE is unsupported yet.");
+
+   this.fs = fs;
+   this.targetFile = f;
+   this.preparingFile = generateTemporaryFilename(f);
+   this.closed = false;
+
+   if (writeMode == FileSystem.WriteMode.NO_OVERWRITE && 
fs.exists(targetFile)) {
+   throw new IOException("Target file " + targetFile + " 
is already exists.");
+   }
+
+   this.preparedOutputStream = fs.create(this.preparingFile, 
writeMode);
+   }
+
+   @Override
+   public long getPos() throws IOException {
+   return this.preparedOutputStream.getPos();
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+   this.preparedOutputStream.write(b);
+   }
+
+   @Override
+   public void flush() throws IOException {
+   this.preparedOutputStream.flush();
+   }
+
+   @Override
+   public void sync() throws IOException {
+   this.preparedOutputStream.sync();
+   }
+
+   /**
+* Does the cleanup things, close the stream and delete the {@link 
#preparingFile}.
+*/
+   @Override
+   public void close() throws IOException {
+   if (!closed) {
--- End diff --

Same here.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assigne

[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-06-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497879#comment-16497879
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5982#discussion_r192370481
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/ClosingAtomicCreatingFSDataOutputStream.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link org.apache.flink.util.WrappingProxy} for {@link 
AtomicCreatingFsDataOutputStream} that is used to
+ * implement a safety net against unclosed streams.
+ *
+ * See {@link SafetyNetCloseableRegistry} for more details on how this 
is utilized.
+ */
+@Internal
+public class ClosingAtomicCreatingFSDataOutputStream
+   extends AtomicCreatingFsDataOutputStream
+   implements 
WrappingProxyCloseable {
+
+   private final SafetyNetCloseableRegistry registry;
+   private final String debugString;
+   private AtomicCreatingFsDataOutputStream outputStream;
+
+   private volatile boolean closed;
+
+   private ClosingAtomicCreatingFSDataOutputStream(
+   AtomicCreatingFsDataOutputStream delegate, 
SafetyNetCloseableRegistry registry, String debugString) throws IOException {
+   this.outputStream = delegate;
+   this.registry = Preconditions.checkNotNull(registry);
+   this.debugString = Preconditions.checkNotNull(debugString);
+   this.closed = false;
+   }
+
+   public boolean isClosed() {
+   return closed;
+   }
+
+   @Override
+   public long getPos() throws IOException {
+   return outputStream.getPos();
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+   outputStream.write(b);
+   }
+
+   @Override
+   public void flush() throws IOException {
+   outputStream.flush();
+   }
+
+   @Override
+   public void sync() throws IOException {
+   outputStream.sync();
+   }
+
+   @Override
+   public void close() throws IOException {
+   if (!closed) {
--- End diff --

Either `closed` is not required to be `volatile` or this is not enough to 
prevent a race condition from happening. You could either use an 
`AtomicBoolean::compareAndSet` or in this particular case rely on the 
serialization from the registry, i.e. `if (registry.unregisterCloseable(this)) 
{...}` 


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494674#comment-16494674
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen I've addressed your comments, could you please have a look 
again?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493057#comment-16493057
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Yes, @StephanEwen thanks for the continuous suggestions, will follow your 
suggestion.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492828#comment-16492828
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
I think we need to have a special output stream type 
(`AtomicCreatingFsDataOutputStream` or similar) as the return type of 
`FileSystem.createAtomic()`. Otherwise, how can a user actually create a file? 
The `closeAndPublish()` method is not part of any API class.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476945#comment-16476945
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen I guess this PR is already for an another look now...


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476073#comment-16476073
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
My gut feeling is that we don't need `WriteMode.OVERWRITE` in cases where 
one wants such an atomic file creation...


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476033#comment-16476033
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen Thanks for your good suggestions! Will update PR like it, and 
what about the problem related to WriteMode.OVERWRITE, do you against if we 
don't support it in `createAtomically()`?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475919#comment-16475919
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
Good point about the renaming on `close()` in case close is called for 
cleanup, rather than success.

We could follow the same semantics as in 
[CheckpointStateOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java#L61)

There the semantics are:
  - `close()` means "close on error / cleanup" and closes the stream and 
deletes the temp file.
  - `closeAndPublish()` would mean "close on success" and close the stream 
and rename the file.
  - After ``closeAndPublish()` has been called, `close()` becomes a no-op.

The 
[FsCheckpointMetadataOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java)
 implements that pattern, I think it worked well and is easy to use.




> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475438#comment-16475438
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Hi, I met a problem here, for the Hadoop FileSystem when the 
`schema='hdfs'`, we can't rename a file to a existing file. This means that it 
hard(I'm not sure whether it's possible) to support the `WriteModel.OVERWRITE` 
for `createAtomically()` with `schema='hdfs'`, if we work-around this by 
firstly rename the existing file to a tmp file and then rename the pre-commit 
file to the target file, then this two step operations is not atomic, which may 
lead to a inconsistent result if the JVM crashes between this operations. I'm 
running out of myself on this now...Do you have any suggestions for the 
`createAtomically()` when `schema='hdfs'` and 
`writeMode='WriteMode.OVERWRITE'`? @StephanEwen 


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475318#comment-16475318
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Hi @StephanEwen, I added more tests for the 
`FileSystem#createAtomically()`, concerning the `TwoPhaseFsDataoutputStream`, 
can we introduce a `commit_on_close` option for it to make it easier to use? If 
the `commit_on_close=true`, we commit the writing in `close()` atomically. If 
the `commit_on_close=false`, we need user to call `commit()` manually, this 
could be useful in try-with-resource situation, what do you think?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473941#comment-16473941
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen After thinking your comments again, I think I misunderstood 
the `Also, we need this method to be implemented in all FileSystem subclasses.` 
😭  , I will address that. But the question related to 
`TwoPhaseFsDataoutputStream` still a bit confuse to me...


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473870#comment-16473870
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Hi @StephanEwen Thank you very much for your reply, I'm not sure whether 
just overriding `close()` to do `super.close()` + `rename()` is enough, for 
example.
```
try (outputStream = new TwoPhraseFSDatautputStream(...)) {
outputStream.write("part1");
throw new RuntimeException("xxx");
outputStream.write("part2");
}
```
This will also rename the `tmp file` to the `target file`, because we just 
`rename()` in `close()`. And the current `TwoPhraseFSDatautputStream` works as 
a wrapper so it should have supported all the file system. I'm not sure whether 
I misunderstand what your meaning... please let knows if I misunderstand 
something and your opinion, Thanks!



> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473859#comment-16473859
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
Thanks for preparing this.

I looked at the `TwoPhraseFSDatautputStream` - maybe we can make this 
simpler. Do we need the distinction between phases? Is it not enough to behave 
as a regular stream, just overriding `close()` to do `super.close()` + 
`rename()`? That may be enough. When the stream is closed, all the writing 
methods anyways fail with a "stream closed exception".

Also, we need this method to be implemented in all FileSystem subclasses.

Typos: "Phrase" --> "Phase"


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473487#comment-16473487
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Hi @StephanEwen I have update the PR according to the above comments, it's 
ready for an another review.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472330#comment-16472330
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Hi @StephanEwen,
- I prefer to introduce the `create(Path, WriteMode)`, because I feel this 
is more extensible (it could allow us to introduce other WriteMode's in the 
future).
- I would choose to throw unsupported operations exception for the others 
schema, I think that make the program more deterministic.

What do you think?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472191#comment-16472191
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
How about adding the method `createAtomically` or so, with otherwise the 
same signature as the `create(Path, WriteMode)` method?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471920#comment-16471920
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen Thank you very much for your great suggestion! I will address 
this that way.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471903#comment-16471903
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
I think this fix here might not work for S3, because a rename() with the S3 
file systems will actually trigger a copy (or even a download and upload), so 
it is not a cheap operation.

The we can fix this by adding a `create(...)` method (or mode) to the 
FileSystem API that does not publish data in the file until `close()` is 
called. For hdfs://, file://, this would be using a temp file with renaming, 
for S3 we don't write to a temp file, because S3 makes the file only visible on 
close() anyways.


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470221#comment-16470221
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Hi @StephanEwen Could you please have a look at this?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)