[ https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212811#comment-16212811 ]
ASF GitHub Bot commented on FLINK-7737: --------------------------------------- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4876 [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase ## What is the purpose of the change It depends whether to call hsync or hflush on the underlying file system and user preferences. Normally hflush is enough to protect against single machine HDFS failures and against TaskManagers failures. However if user is using S3 like file system, or wants to protect against whole HDFS rack power loss hsync must be used instead. This is a stop gap solution until proper fix waiting for https://issues.apache.org/jira/browse/FLINK-5789 ## Verifying this change This change is hard to test :( One could think about writing a unit test using mocks, but that would only copy the implementation. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f7737 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4876.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4876 ---- commit 8d1c34197d1098cbd5a56ec882da17f42410c1f4 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Date: 2017-10-20T14:47:52Z [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase It depends whether to call hsync or hflush on the underlying file system and user preferences. Normally hflush is enough to protect against single machine HDFS failures and against TaskManagers failures. However if user is using S3 like file system, or wants to protect againt whole HDFS rack power loss hsync must be used instead. ---- > On HCFS systems, FSDataOutputStream does not issue hsync only hflush which > leads to data loss > --------------------------------------------------------------------------------------------- > > Key: FLINK-7737 > URL: https://issues.apache.org/jira/browse/FLINK-7737 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.3.2 > Environment: Dev > Reporter: Ryan Hobbs > Assignee: Piotr Nowojski > Priority: Blocker > Fix For: 1.4.0 > > > During several tests where we simulated failure conditions, we have observed > that on HCFS systems where the data stream is of type FSDataOutputStream, > Flink will issue hflush() and not hsync() which results in data loss. > In the class *StreamWriterBase.java* the code below will execute hsync if the > output stream is of type *HdfsDataOutputStream* but not for streams of type > *FSDataOutputStream*. Is this by design? > {code} > protected void hflushOrSync(FSDataOutputStream os) throws IOException { > try { > // At this point the refHflushOrSync cannot be null, > // since register method would have thrown if it was. > this.refHflushOrSync.invoke(os); > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } > } catch (InvocationTargetException e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e.getCause()); > Throwable cause = e.getCause(); > if (cause != null && cause instanceof IOException) { > throw (IOException) cause; > } > throw new RuntimeException(msg, e); > } catch (Exception e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e); > throw new RuntimeException(msg, e); > } > } > {code} > Could a potential fix me to perform a sync even on streams of type > *FSDataOutputStream*? > {code} > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } else if (os instanceof FSDataOutputStream) { > os.hsync(); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)