[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212811#comment-16212811
 ] 

ASF GitHub Bot commented on FLINK-7737:
---------------------------------------

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/4876

    [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase

    ## What is the purpose of the change
    
    It depends whether to call hsync or hflush on the underlying file system 
and user preferences. Normally hflush is enough to protect against single 
machine HDFS failures and against TaskManagers failures. However if user is 
using S3 like file system, or wants to protect against whole HDFS rack power 
loss hsync must be used instead.
    
    This is a stop gap solution until proper fix waiting for 
https://issues.apache.org/jira/browse/FLINK-5789
    
    ## Verifying this change
    
    This change is hard to test :( One could think about writing a unit test 
using mocks, but that would only copy the implementation.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink f7737

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4876.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4876
    
----
commit 8d1c34197d1098cbd5a56ec882da17f42410c1f4
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Date:   2017-10-20T14:47:52Z

    [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase
    
    It depends whether to call hsync or hflush on the underlying file system
    and user preferences. Normally hflush is enough to protect against single
    machine HDFS failures and against TaskManagers failures. However if user is
    using S3 like file system, or wants to protect againt whole HDFS rack power
    loss hsync must be used instead.

----


> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7737
>                 URL: https://issues.apache.org/jira/browse/FLINK-7737
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.3.2
>         Environment: Dev
>            Reporter: Ryan Hobbs
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>                               ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                       }
>               } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>                       }
> throw new RuntimeException(msg, e);
>               } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>               }
>       }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
>                                 ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>                         } else if (os instanceof FSDataOutputStream) {
>                                 os.hsync();
>                         }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to