[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669362#comment-16669362 ] ASF GitHub Bot commented on FLINK-9752: --- addisonj edited a comment on issue #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer. URL: https://github.com/apache/flink/pull/6795#issuecomment-434477664 @kl0u the jira issue (https://issues.apache.org/jira/browse/FLINK-9752) says this is fixed in 1.6.2, but it doesn't appear like this has been backported to that release... Is this work somehow still reflected in 1.6.2? Edit: had copy pasted the wrong issue :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.2, 1.7.0 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669358#comment-16669358 ] ASF GitHub Bot commented on FLINK-9752: --- addisonj commented on issue #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer. URL: https://github.com/apache/flink/pull/6795#issuecomment-434477664 @kl0u the jira issue (https://issues.apache.org/jira/browse/FLINK-10383) says this is fixed in 1.6.2, but it doesn't appear like this has been backported to that release... Is this work somehow still reflected in 1.6.2? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.2, 1.7.0 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646427#comment-16646427 ] ASF GitHub Bot commented on FLINK-9752: --- kl0u commented on issue #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer. URL: https://github.com/apache/flink/pull/6795#issuecomment-428956598 I fixed the issue @igalshilman . I will wait once again for travis to give green and then I will merge. Thanks for the review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16644850#comment-16644850 ] ASF GitHub Bot commented on FLINK-9752: --- kl0u commented on issue #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer. URL: https://github.com/apache/flink/pull/6795#issuecomment-428537348 Thanks a lot @igalshilman for the review! I will fix this and then merge as soon as Travis gives green. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16644798#comment-16644798 ] ASF GitHub Bot commented on FLINK-9752: --- igalshilman commented on issue #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer. URL: https://github.com/apache/flink/pull/6795#issuecomment-428525165 Thanks @kl0u, LGTM . I will wait for your fix so that Travis would execute the AWS tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643487#comment-16643487 ] ASF GitHub Bot commented on FLINK-9752: --- kl0u commented on issue #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer. URL: https://github.com/apache/flink/pull/6795#issuecomment-428208189 @igalshilman I integrated your comments and Travis gives green light. Let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639594#comment-16639594 ] ASF GitHub Bot commented on FLINK-9752: --- kl0u commented on issue #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer. URL: https://github.com/apache/flink/pull/6795#issuecomment-427311243 R @igalshilman @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639420#comment-16639420 ] ASF GitHub Bot commented on FLINK-9752: --- kl0u opened a new pull request #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer. URL: https://github.com/apache/flink/pull/6795 ## What is the purpose of the change Adds the recoverable writer for S3. The new recoverable writer is only available for **Hadoop S3** (**not Presto** for now) and uses the MultiPart feature to upload part files. The user is supposed to call `fs.createRecoverableWriter()`, which will give back an `S3RecoverableWriter`. This allows to: `open(Path)` which give an `S3RecoverableFsDataOutputStream` or call `recover()` a previous such stream from a checkpoint. The main functionality is implemented by the `S3RecoverableFsDataOutputStream`. This uses: 1) a `RefCountedFSOutputStream` - a file stream backed by a local tmp file which is reference counted so when there are no references it gets deleted - to write a part of the multi-part upload. 2) a `RecoverableMultiPartUpload` to take snapshots of in-flight Multi-Part Uploads (MPU) and upload already ready parts. 3) From the stream, the user can also get a `Committer` which allows him to complete the MPU, i.e. "publish" the data. The whole process is a two-phase commit, with files being staged for commit, and then committed as a unit. **Checkpointing / Recovery** As the user writes data to the stream, when the part reaches a minimum size, it gets uploaded to S3, and a new part-file is opened. An uploaded part is identified by its `PartETag` which is further used when "committing" the MPU. So the list of `PartETag` 's associated with the MPU are stored in state. When `persist` is called, the "current" part file which has not yet reached the minimum size is uploaded to S3 as an independent object (not as part of the MPU), and its handle is stored in state. Apart from that, we keep writing to the same part file. Upon recovery, we retrieve the set of valid `PartEtags` from the state and we download the "in-progress" part file, which was uploaded as an independent object, and we resume writing from there. Any uploaded `PartEtags` between the last successful checkpoint and the failure are simply discarded. This alleviates any need for active `truncating`. ## Brief change log This PR consists of mainly new code so the changelog is not much help. The reviewer can find above a description of what the code does. ## Verifying this change This change added Unit Tests and can be verified as follows: * `RefCountedBufferingFileStreamTest` * `RefCountedFileTest` * `IncompletePartPrefixTest` * `S3RecoverableFsDataOutputStreamTest` And semi end-to-end test against actually S3: * `HadoopS3RecoverableWriterTest` * `HadoopS3RecoverableWriterExceptionTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / n**o** / don't know) - The S3 file system connector: (**yes** / no / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) yet R @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618730#comment-16618730 ] Stephan Ewen commented on FLINK-9752: - There is a WIP branch https://github.com/StephanEwen/incubator-flink/tree/s3_recoverable_writer_2 We are currently finalizing it. I think this feature is currently at the point where the work does not yet parallelize well, but once the first version is added, we will take all the help we can get to optimize it further. > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16603764#comment-16603764 ] Chao-Han Tsai commented on FLINK-9752: -- [~kkl0u] what is the timeline on shipping this feature? We are looking forward to this feature and would love to help if you like. > Add an S3 RecoverableWriter > --- > > Key: FLINK-9752 > URL: https://issues.apache.org/jira/browse/FLINK-9752 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Stephan Ewen >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.6.1, 1.7.0 > > > S3 offers persistence only when uploads are complete. That means at the end > of simple uploads and uploads of parts of a MultiPartUpload. > We should implement a RecoverableWriter for S3 that does a MultiPartUpload > with a Part per checkpoint. > Recovering the reader needs the MultiPartUploadID and the list of ETags of > previous parts. > We need additional staging of data in Flink state to work around the fact that > - Parts in a MultiPartUpload must be at least 5MB > - Part sizes must be known up front. (Note that data can still be streamed > in the upload) -- This message was sent by Atlassian JIRA (v7.6.3#76005)