[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479481#comment-17479481 ] Martijn Visser commented on FLINK-11838: [~galenwarren] That's no dumb question :) It's just a matter of filling in the {{release notes}} field in the Jira ticket. When a Flink release is created, all the {{release notes}} get auto generated. See for example https://issues.apache.org/jira/browse/FLINK-25553 > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.15.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479454#comment-17479454 ] Galen Warren commented on FLINK-11838: -- [~MartijnVisser] I'd say let's handle documentation and release notes on the other one, https://issues.apache.org/jira/browse/FLINK-25577, since this one is now closed. Sorry if this is a dumb question, but what is involved in adding this to the release notes? > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.15.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479153#comment-17479153 ] Martijn Visser commented on FLINK-11838: Thanks for taking of this [~galenwarren] We should also make sure that this ends up in the release notes; do you think we should add it to this ticket or do you think we should add it to https://issues.apache.org/jira/browse/FLINK-25577 ? > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.15.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406351#comment-17406351 ] Xintong Song commented on FLINK-11838: -- [~laughingman7743] Unfortunately, this feature is not finished and will no be presented in release 1.14. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, stale-assigned, usability > Fix For: 1.15.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406216#comment-17406216 ] Tomoyuki NAKAMURA commented on FLINK-11838: --- Is there any progress on this ticket, it looks like flink-1.14.0-rc0 has been released. https://github.com/apache/flink/releases/tag/release-1.14.0-rc0 > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, stale-assigned, usability > Fix For: 1.14.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398108#comment-17398108 ] Galen Warren commented on FLINK-11838: -- I know this has taken a long time. I'll have the code in before the upcoming 1.14 freeze. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, stale-assigned, usability > Fix For: 1.14.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326990#comment-17326990 ] Galen Warren commented on FLINK-11838: -- PR is still active. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, stale-assigned, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326986#comment-17326986 ] Flink Jira Bot commented on FLINK-11838: This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, stale-assigned, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320638#comment-17320638 ] Xintong Song commented on FLINK-11838: -- Thanks [~galenwarren]. I'll try to take a look asap. Just to managed expectation, this could take some time. We are currently prioritizing the 1.13.0 release testing works. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320363#comment-17320363 ] Galen Warren commented on FLINK-11838: -- [~xintongsong], PR created: [https://github.com/apache/flink/pull/15599] > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316376#comment-17316376 ] Galen Warren commented on FLINK-11838: -- Working on the new PR now ... coming soon. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314161#comment-17314161 ] Xintong Song commented on FLINK-11838: -- Thanks for the update, [~galenwarren]. Either updating the old PR or starting a new one works for me. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314079#comment-17314079 ] Galen Warren commented on FLINK-11838: -- [~xintongsong] – actually, is that the right thing to do, start a new PR? I'm thinking that might be the cleanest way to proceed, since we never really used the old one that I created prematurely, and the info in the old one is out of date, but I'll follow your guidance here on what to do. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314067#comment-17314067 ] Galen Warren commented on FLINK-11838: -- Hi [~xintongsong] – sorry for the long delay. I plan to create a new PR and upload some code this weekend, my other work has let up a bit and I have some time to look at this. I'll post the link here when it's ready. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17313642#comment-17313642 ] Xintong Song commented on FLINK-11838: -- Hi [~galenwarren], any updates on this ticket? I'd like to kindly inform you that the Apache Flink community has decided to adopt a new Jira process [1], according to which assigned tickets need a regular status update. Tickets will be labeled "stale-assigned" in 7 days without status updates, and unassigned in another 7 days. There will soon be a bot to enforce that. http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Jira-Process-amp-Bot-td49676.html#a49734 > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303537#comment-17303537 ] Galen Warren commented on FLINK-11838: -- Not that I'm aware of. Writing checkpoints/savepoints and HA data to GCS buckets works fine as-is, but to use StreamingFileSink, a RecoverableWriter implementation is required. That exists now for S3 but not GCS. I have a local GCS RecoverableWriter implementation I'm using and which seems to be working, and I'm hoping to push the first part of the code for review by [~xintongsong] this weekend. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302382#comment-17302382 ] Berkay Öztürk commented on FLINK-11838: --- Are there any other way of writing streaming data into a GCS bucket with Flink? Or is it required to have a RecoverableWriter implementation? Using StreamingFileSink (or FileSink) gives this error: {code:java} java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS {code} > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300332#comment-17300332 ] Galen Warren commented on FLINK-11838: -- [~BerkayOzturk] No, it's not in a working state. Since that PR, [~xintongsong] and I refined the proposed implementation significantly. I'm hoping to get back to this very soon, I've had some work stuff come up that is taking all my time. Sorry about the delay! > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300157#comment-17300157 ] Berkay Öztürk commented on FLINK-11838: --- Hello [~galenwarren], I currently need a GCS connector implementation for work. Is your [latest commit in [add-gcs-filesystem-with-recoverable-writer-2|https://github.com/coachclientconnect/flink/compare/add-gcs-filesystem-with-recoverable-writer-2] brach|[https://github.com/coachclientconnect/flink/commit/e5e69b67c74e8a7576b3f550f3bb79defacaac6d]] in working state? Or should I (and others who need this) wait for your update on the PR? Thank you > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300156#comment-17300156 ] Berkay Öztürk commented on FLINK-11838: --- Hello [~galenwarren], I currently need a GCS connector implementation for work right now. Is your [latest commit in [add-gcs-filesystem-with-recoverable-writer-2|https://github.com/coachclientconnect/flink/compare/add-gcs-filesystem-with-recoverable-writer-2] brach|[https://github.com/coachclientconnect/flink/commit/e5e69b67c74e8a7576b3f550f3bb79defacaac6d]] in working state? Or should I (and others who need this) wait for your update on the PR? Thank you > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17295702#comment-17295702 ] Xintong Song commented on FLINK-11838: -- Thanks for the update, [~galenwarren]. No worries. Good luck with your work project :) > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17295384#comment-17295384 ] Galen Warren commented on FLINK-11838: -- I just wanted to add an update here so this doesn't go stale – I should be able to push the first piece of the code in a PR in about a week, once this work project I'm on dies down a bit. So this is still coming, sorry for the delay. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288764#comment-17288764 ] Xintong Song commented on FLINK-11838: -- Thanks for the updates, [~galenwarren]. No worries, take your time. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288419#comment-17288419 ] Galen Warren commented on FLINK-11838: -- Hey, I just wanted to check in here. Sorry for the delay, some other work stuff has come up and I haven't been able to spend any time here. But I will still complete this, will hopefully get back to it toward the end of this week. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284896#comment-17284896 ] Galen Warren commented on FLINK-11838: -- Ah, OK – I see. It will take me a couple of days to get things organized, I'll post back here when I have something for you to look at. Thanks. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284771#comment-17284771 ] Xintong Song commented on FLINK-11838: -- [~galenwarren], What makes the original PR not easy to review is not the size of the PR, but all the changes are put in a single commit. It would be helpful to break the PR into multiple commits, each contains a smaller group of closely related changes. Please find detailed guidelines and examples [here|https://flink.apache.org/contributing/code-style-and-quality-pull-requests.html]. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284751#comment-17284751 ] Galen Warren commented on FLINK-11838: -- That's fine on "compose on persist", I'll leave it out for now and supply it in a separate commit if we decide to go there. This reminds me of a question I meant to ask, though. When we first started, you had some concern about the size of the original PR and suggested it be broken into parts. The new PR will be roughly the same size – if that's too big, any suggestion on how to break it into pieces? Should the unit tests be a separate commit? > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284518#comment-17284518 ] Xintong Song commented on FLINK-11838: -- Thanks [~galenwarren], I think we are good to move forward to the PR review. Concerning "compose on persist", I'm leaning towards keeping it simple & stupid until we come up with a thorough plan. My concern for throwing something "not obvious which is better" to the users as a configuration option is that, we don't known when later we have a thorough plan whether this option will be needed by (or worse, conflict with) the plan. Configuration options are considered public interfaces, thus incompatible changes (e.g., removing an exist option) are not forbidden but would be good to avoid. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284476#comment-17284476 ] Galen Warren commented on FLINK-11838: -- {quote}Maybe for the first step, it's good enough to simply do all the composing and deleting at the end. We can try to optimize it later if we indeed see a performance problem in composing and deleting the temporary blobs. {quote} I'm fine to go either way here. I've already put something together locally that allows for composing both at persist and commit, but it would be simple to revert to just doing it at commit. Maybe you can take a look when we get to the code phase to see what you think? If it's not obvious which is better, I suppose we could also control that – "compose on persist" – via a Flink option. Are you comfortable with the approach now? If so, I'll work on getting the code together in order to update the PR. Thanks! > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284339#comment-17284339 ] Xintong Song commented on FLINK-11838: -- bq. perhaps we expose this chunk size as a Flink option to give the user some control over the process Sounds good to me. bq. it seems to me that we can't safely delete any of the temporary blobs along way, because it's possible that we might restore to a checkpoint prior to some or all of the incremental compose operations having occurred That's a good point. Seems we have to keep the temporary blobs anyway, and composing blobs in advance only serves as a performance optimization at the cost of more temporary storage space and system complexity. Maybe for the first step, it's good enough to simply do all the composing and deleting at the end. We can try to optimize it later if we indeed see a performance problem in composing and deleting the temporary blobs. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283950#comment-17283950 ] Galen Warren commented on FLINK-11838: -- One more thought/question. While we can potentially compose temp blobs along the way, to avoid having to compose them all at commit time, it seems to me that we can't safely delete any of the temporary blobs along way, because it's possible that we might restore to a checkpoint prior to some or all of the incremental compose operations having occurred. In that case, we'd have to repeat the compose operations, which means the underlying temp blobs would need to be there. If that's right, then we'd necessarily have to wait to the end to delete the temp blobs. I was wondering, would it be allowable to do any of those delete operations in parallel? The coding [guidelines|https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading] would seem to discourage this, but they don't outright prohibit it, so I thought I'd ask first. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283747#comment-17283747 ] Galen Warren commented on FLINK-11838: -- Hi [~xintongsong], sorry for the delay here, I had some other, unrelated work I had to focus on. I like your idea of using {{WriteChannel}} for the uploads, but to close each one when {{RecoverableFsDataOutputStream.persist}} is called, so that we're not relying on {{WriteChannel}} for recoverability. {{WriteChannel}} allows one to control how frequently it flushes data via [setChunkSize|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-], perhaps we expose this chunk size as a Flink option to give the user some control over the process, i.e. how much memory is used for buffering? It could be optional, not setting it would mean to use the Google default. Yes, it would be straightforward to compose blobs at any point in the process, i.e. on commit and/or at {{persist}} calls along the way. Maybe we compose them on commit (of course) and also whenever {{persist}} is called when there are at least 32 temp blobs to be composed? That way, we spread the compose work out over time but also call {{compose}} as few times as possible, composing as many blobs as possible in each call, which seems like it would be the most efficient. Shall we go with this approach? > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282195#comment-17282195 ] Xintong Song commented on FLINK-11838: -- Well, I guess I see the divergence here. So switching to {{normal upload}} means we need to upload all data of a blob at once. It does not work like keeping writing data to the blob in a session. That is why we need to buffer the data in memory until we want to complete a blob. What if we keep using the resumable uploading for its session uploading, but do not rely on its recoverability? For each {{RecoverableFsDataOutputStream#persist}}, we complete a temporary blob which we can recover from, and start a new one if needed. And on committing (or in advance), we {{compose}} the temporary blobs into the persist file. And we do not need to buffer many data in memory. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282191#comment-17282191 ] Xintong Song commented on FLINK-11838: -- Thanks [~galenwarren], h4. Java Serialization According to the veterans, the biggest concern for java serialization is indeed the compatibility issue. Relying on java serialization of classes provided by the GCS java library means that, whenever we bump the version of the library, we may break the checkpoint/savepoint compatibility if the library contains incompatible changes. And from what I understand, GCS is not designed to support resumable uploading cross versions, which means they may not pay attention to try reduce the incompatible changes. h4. Switching to normal upload In general, I think the one-week limitation and the lack of compatibility commitment both suggest that, GCS resumable uploading is not designed the long period or cross version use cases, like Flink's checkpoint / savepoint. In that sense, I think switching to the normal upload and maintain the recoverability ourselves makes sense. h4. A few questions on the details. bq. The RecoverableFsDatastream could buffer written data and write temporary blobs at various points (flush, sync, persist, close, or even write, if the buffered size exceeds some threshold). Wouldn't it be good enough to complete a temporary blob (and start a new one is necessary) when {{persist}} is called? IIUC, we can still use {{WriteChannel}} for uploading a temporary blob, let it decide when to flush buffered data to GCS, and close it when {{persist}} is called. bq. and on commit the temporary files would be combined into one file and copied to the final blob location. An alternative could be combining temporary blobs in advance, e.g., whenever we have 32 temporary blobs. I'm not entirely sure if this reduces the workload at the final committing if there's large amount of temporary blobs. Maybe we can conduct some simple experiments, see how the performance of GCS {{compose}} relates to the number and size of objects, before deciding when and how to combine the temporary blobs. bq. In this approach, we'd probably need a Flink option to set the maximum size of a temporary blob, which would control how frequently temporary blobs would be created. Tuning this would involve a tradeoff between memory usage and checkpoint size on the Flink side vs. time to commit the writes via the recoverable writer, as I'd expect it to take longer to compose a larger number of smaller temporary blobs together into the final blob than it would a smaller number of larger temporary blobs. I'm not sure how the checkpoint size is related. A checkpoint should be considered completed only if the temporary blobs are successfully persisted. Thus the checkpoint should not contain any buffered non-written data. Maybe we should simply relying on the streaming file sink operator and the checkpoint mechanism to decide how frequently the data should be persisted / flushed / synced, rather than introducing another set of tuning knobs. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281769#comment-17281769 ] Galen Warren commented on FLINK-11838: -- Hi [~xintongsong] – a couple more thoughts here. I think I understand why the code in [BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java] and [BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java] is more complicated than I expected; there is a [requirement|https://cloud.google.com/storage/docs/performing-resumable-uploads] in resumable uploads that all the uploaded chunks (except the last one) be a multiple of 256 Kib, and so the writers have to buffer data in order to meet that requirement. The RestorableState then, in general, contains some amount of buffered data, i.e. data that has been written but that has not yet been sent as part of an upload chunk. Also, I came across a bit of a disturbing [thread|https://issuetracker.google.com/issues/137168102] from last year, where a GCS bug was being discussed that, essentially, caused in-progress resumable uploads to disappear and return 410 GONE errors. Such a failure would obviously be a big problem for the Flink use case we're talking about here. That thread claims this bug is fixed as of July 2020; however, it got me thinking about alternative implementations, especially since they provided this guidance: {quote}To retry successfully, catching 500 and 410 errors is required and, as the official documentation recommends ([https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone]), implementing a retry by starting a new session for the upload that received an unsuccessful status code but still needs uploading. {quote} which we would be unable to follow. So here's a different implementation idea. What if we didn't use resumable uploads at all, and instead just uploaded data in a series of temporary blobs, each of which would be uploaded via a normal, nonresumable upload. The [RecoverableFsDatastream|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html] could buffer written data and write temporary blobs at various points (flush, sync, persist, close, or even write, if the buffered size exceeds some threshold). In this approach, the resumable state would include a list of all the temporary blobs uploaded associated with this write, and on commit the temporary files would be combined into one file and copied to the final blob location. To combine the temp blobs into the final blob, we could use the [compose|https://cloud.google.com/storage/docs/composing-objects] feature of GCS, which allows combining up to 32 blobs into one in a single call, and which could compose an arbitrary number of blobs with a pretty simple algorithm. A few additional benefits of this approach, potentially: * Each temp blob write would be a normal, nonresumable upload with known contents and size at the time of upload, so we could use CRC checks as recommended [here|https://cloud.google.com/storage/docs/hashes-etags]. * We'd sidestep the one-week limitation; the lifetime of temporary blobs could be made indefinite or managed via bucket policy * We would be in full control of the resumable state, so we'd avoid any issues related to Java serialization In this approach, we'd probably need a Flink option to set the maximum size of a temporary blob, which would control how frequently temporary blobs would be created. Tuning this would involve a tradeoff between memory usage and checkpoint size on the Flink side vs. time to commit the writes via the recoverable writer, as I'd expect it to take longer to compose a larger number of smaller temporary blobs together into the final blob than it would a smaller number of larger temporary blobs. Anyway, just a thought – let me know what you think. And enjoy the New Year! :) > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compa
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281488#comment-17281488 ] Xintong Song commented on FLINK-11838: -- Thanks for the information. FYI, the Chinese New Year holidays are from Feb. 11 to 17, during which I could be less responsive. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281481#comment-17281481 ] Galen Warren commented on FLINK-11838: -- Thanks, that all sounds reasonable. I'll take a look at the REST option. I'm going to be a bit tied up with some work-related stuff for the next couple of days so I might not get back until later in the week, but I'll definitely get back on this soon. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280730#comment-17280730 ] Xintong Song commented on FLINK-11838: -- Thanks [~galenwarren]. Your plans regarding the temporary blobs and the resumable upload time limit sound good to me. Concerning the serialization issue, I'm not saying we should go for the REST API approach. Just trying to understand what options do we have and their pros and cons. TBH, I'm not entirely sure why we had this code quality rule against java serialization in the first place. I can see some downsides using java serialization, which might not be a big problem in this case. But it would be better to understand the original purpose, in case we overlook something. I'll try to talk to some veterans see if we can find out what's the original concerns. I think investigating the REST approach can go concurrently. Regarding the HTTP/REST client, Netty {{HttpRequest}} is used by some of Flink's runtime components. Please be aware that {{flink-shaded-netty}} is preferred than depending on Netty directly. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280587#comment-17280587 ] Galen Warren commented on FLINK-11838: -- Thanks. All of your initial assumptions are correct, though I might add that it is straightforward to serde a RestorableState if Java serialization is allowed (i.e. the object is guaranteed to implement Serializable). But I understand that's not preferred. To answer your questions: *Does "writing a blob to a temporary location" mean that the user always needs to configure a temporary location? How is the temporary location cleaned, say if they're never moved to the committed location?* I was thinking to manage temporary locations through a couple of Flink options: * First, a "prefix" option, which would be a string to be applied in front of the supplied, "final" blob name. This prefix could be default be something like ".inprogress/". * Second, an option for the bucket to use for temp blobs, which would not be required. If not supplied, the same bucket would be used for temp blobs as for the associated final blobs. I was also planning on appending a UUID string to the end of temp blob locations, in order to guarantee their uniqueness during the temp-blob writing phase, in case somehow multiple writes to the same final blob were in progress at the same time. So, with the defaults, a write to a blob {{gs://final_bucket/foo/bar}} would use a temp blob location of, say, {{gs://final_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}. The prefix could be configured via the first option; also, if the user wanted to write all temp blobs to a "temp_bucket" bucket, that bucket could be specified via the second option, yielding {{gs://temp_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}. As for cleaning the temporary blobs, is that what [RecoverableWriter.cleanupRecoverableState|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-] does? I was thinking to clean up any temporary blobs there. Beyond that, if a separate bucket for temporary blobs were used, I suppose one could apply an [object lifecycle-management rule|https://cloud.google.com/storage/docs/lifecycle] that would delete blobs after some period of time. These rules look to be applicable only at the bucket level, so this would only work if a separate bucket were used just for temporary blobs. *Per [this doc|https://cloud.google.com/storage/docs/resumable-uploads], a resumable upload must be completed within a week. This could be surprising for the users, if they try to restore a job from checkpoints/savepoints after pausing for more than one week.* Yes, I would propose to disclose this limitation via a disclaimer, similar to the one used for S3: {quote}Important Note 2: To guarantee exactly-once semantics while being efficient, the {{StreamingFileSink}} uses the [Multi-part Upload|https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html] feature of S3 (MPU from now on). This feature allows to upload files in independent chunks (thus the "multi-part") which can be combined into the original file when all the parts of the MPU are successfully uploaded. For inactive MPUs, S3 supports a bucket lifecycle rule that the user can use to abort multipart uploads that don't complete within a specified number of days after being initiated. This implies that if you set this rule aggressively and take a savepoint with some part-files being not fully uploaded, their associated MPUs may time-out before the job is restarted. This will result in your job not being able to restore from that savepoint as the pending part-files are no longer there and Flink will fail with an exception as it tries to fetch them and fails. {quote} Admittedly, it does seem that S3 provides more configuration options here than GCS. It would be nice if the week limit were configurable, but it doesn't seem to be, based on my read. *Relying on Java serialization means depending our compatibility on the compatibility of GCS, which should be avoid if possible. Would it be possible to directly work with the REST API and session URI? IIUC this is how the write channel internally works.* I'd need to look into it more closely, but yes, I think this could be possible. I think we'd wind up reimplementing much of what is done in [BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java] and [BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java], which I suppose I was thinking would be good to avoid. The code in the BlobWrite
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280454#comment-17280454 ] Xintong Song commented on FLINK-11838: -- [~galenwarren], Thanks for explaining the design details. This is exactly what I'm looking for. I'm trying to understand the reasons behind some of the design choices. Please confirm if I understand it correctly, or correct me if otherwise. * The design proposes to build {{GCSFileSystem}} on top of {{HadoopFileSystem}}. This is because we can reuse most part of the {{HadoopFileSystem}} implementation, leveraging the GCS-provided {{GoogleHadoopFileSystem}}. * For the {{RecoverableWriter}}, we cannot reuse {{HadoopRecoverableWriter}}, because: ** {{HadoopRecoverableWriter}} checks the schema and Hadoop version ** {{HadoopRecoverableWriter}} assumes files can be appended, which is true for files on HDFS but not for immutable objects on GCS. * The design proposes to leverage the GCS resumable upload feature. ** The feature allows capturing *write state* during writing the object and resume writing by restoring the captured *write state*. ** Both capturing and restoring should happen before the object is completely written (thus not visible for reading), and once the writing is completed the object becomes immutable. ** To use this feature, we need to persist an object ({{RestorableState}}) generated by {{capture()}}, which will be used for {{restore()}} later. However, the implementation of {{RestorableState}} is internal to GCS, and we do not have a good way to serialize/deserialize it. If my understanding is correct, then I have a few questions. # Does "writing a blob to a temporary location" mean that the user always needs to configure a temporary location? How is the temporary location cleaned, say if they're never moved to the committed location? # Per [this doc|https://cloud.google.com/storage/docs/resumable-uploads], a resumable upload must be completed within a week. This could be surprising for the users, if they try to restore a job from checkpoints/savepoints after pausing for more than one week. # Relying on Java serialization means depending our compatibility on the compatibility of GCS, which should be avoid if possible. Would it be possible to directly work with the REST API and session URI? IIUC this is how the write channel internally works. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available, usability > Fix For: 1.13.0 > > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279694#comment-17279694 ] Galen Warren commented on FLINK-11838: -- Thanks [~xtsong]! I really appreciate your help on this. Sorry if I jumped the gun on the PR, I'm happy to follow the process you've outlined, and if it makes more sense to ultimately submit the PR in multiple pieces instead of one, that's fine with me too. So it would seem that the next step would be to discuss the proposed design here. I'll take a crack at that. At a high level, the goal here is to implement RecoverableWriter for GCS, in order to be able to use StreamingFileSink to write to GCS. In Flink, recoverable writers are created by calling [FileSystem.createRecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html#createRecoverableWriter--], so we will also need an implementation of org.apache.flink.core.fs.FileSystem for GCS in order to expose the recoverable writer to Flink. The org.apache.flink.core.fs.FileSystem implementation is straightforward. Google provides an implementation of org.apache.hadoop.fs.FileSystem for GCS via [GoogleHadoopFileSystem|https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java], which can already be used in Flink in other contexts, e.g. for checkpoints. Flink provides [HadoopFileSystem|https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html], which implements org.apache.flink.core.fs.FileSystem in terms of an underlying org.apache.hadoop.fs.FileSystem. So, the proposed FileSystem for GCS would extend HadoopFileSystem, constructing it with an instance of GoogleHadoopFileSystem. This new FileSystem class would also override createRecoverableWriter to return a RecoverableWriter implementation for GCS. We also need an implementation of [FileSystemFactory|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystemFactory.html] and register it with SPI to expose the GCS FileSystem to Flink. So, next is the interesting part – the GCS [RecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html] implementation. At a high level, RecoverableWriter allows one to create an output stream, write bytes to that stream, persist and recover the state of that stream, and ultimately commit the write operation or abort it. In GCS, I propose to do recoverable writes in two steps: * First, write a blob to a temporary location using the [resumable uploads|https://cloud.google.com/storage/docs/resumable-uploads] feature of GCS. This allows for blobs to be written in chunks over a potentially long period of time (up to one week, per the docs). Importantly, it also allows for the write state to be persisted and recovered, via [WriteChannel.capture|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#capture--] and [RestorableState.restore|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/RestorableState.html], which we'll use to implement the persist/recover functionality in RecoverableWriter. * Second, commit the write operation by copying the temporary blob to the "final" blob location, i.e. the one specified in the initial call to RecoverableWriter.open, and deleting the temporary blob. In the event of an aborted upload, the cleanup would consist of deleting just the temporary blob. So, in this approach, the recoverable state from Flink's perspective (i.e. CommitRecoverable and ResumeRecoverable) would include: * The RestorableState returned from WriteChannel * The write position (we have to keep track of this ourselves, because WriteChannel doesn't expose this) * The locations of the temporary and final blobs, so that we can ultimately commit or cleanup the overall operation That's basically it at a high level. I do want to point out one possible conflict with the Flink coding guidelines, though, to get your thoughts. The guidelines say – very emphatically! – not to use Java serialization for anything. In the GCS case, the RestorableState that is returned from WriteChannel.capture is an object that implements Serializable but is otherwise opaque. This object does need to be serialized/deserialized as part of the RecoverableWriter implementation, and it's not clear to me how to do that except by using Java serialization. I'll stop there for now, please let me know if this is the sort of information you're looking for here. I'm happy to drill into any area in more detail if you'd like, jus
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279299#comment-17279299 ] Xintong Song commented on FLINK-11838: -- Hi [~galenwarren], Thanks for offering the contribution. I will help you with this contribution. Since this ticket has not been updated for quite some time and the original PR has been abandoned, I have assigned you to the ticket. Just to managed expectation, I could use some time to pick up the GCS backgrounds and review your design proposal. During this time, I would suggest to take a look at the following guidelines. [https://flink.apache.org/contributing/contribute-code.html] [https://flink.apache.org/contributing/code-style-and-quality-preamble.html] After a first glance at the PR, I've two suggestions. - I noticed you've described your proposal on the PR you've opened. It would be nice to update it to the description of this JIRA ticket. Usually, we use the JIRA ticket for design discussions, and the PR for reviewing implementation details. - The PR contains 3k LOC changes, in a single commit, which could be hard to review, especially when we cannot communicate face-to-face. It would be nice to organize the codes into smaller commits following the contribution guidelines. This can be done after we reach consensus on the design proposal. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Galen Warren >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279023#comment-17279023 ] Galen Warren commented on FLINK-11838: -- I have submitted a new [PR|https://github.com/apache/flink/pull/14875] for this, unrelated to the prior ones, and [~trohrmann] suggested I reuse this Jira ticket. > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS
[ https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17127324#comment-17127324 ] Tomoyuki NAKAMURA commented on FLINK-11838: --- Is there any progress on this ticket? > Create RecoverableWriter for GCS > > > Key: FLINK-11838 > URL: https://issues.apache.org/jira/browse/FLINK-11838 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > GCS supports the resumable upload which we can use to create a Recoverable > writer similar to the S3 implementation: > https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload > After using the Hadoop compatible interface: > https://github.com/apache/flink/pull/7519 > We've noticed that the current implementation relies heavily on the renaming > of the files on the commit: > https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259 > This is suboptimal on an object store such as GCS. Therefore we would like to > implement a more GCS native RecoverableWriter -- This message was sent by Atlassian Jira (v8.3.4#803005)