[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2022-01-20 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479481#comment-17479481
 ] 

Martijn Visser commented on FLINK-11838:


[~galenwarren] That's no dumb question :) It's just a matter of filling in the 
{{release notes}} field in the Jira ticket. When a Flink release is created, 
all the {{release notes}} get auto generated. See for example 
https://issues.apache.org/jira/browse/FLINK-25553

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.15.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2022-01-20 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479454#comment-17479454
 ] 

Galen Warren commented on FLINK-11838:
--

[~MartijnVisser] I'd say let's handle documentation and release notes on the 
other one, https://issues.apache.org/jira/browse/FLINK-25577, since this one is 
now closed.

 

Sorry if this is a dumb question, but what is involved in adding this to the 
release notes?

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.15.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2022-01-19 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479153#comment-17479153
 ] 

Martijn Visser commented on FLINK-11838:


Thanks for taking of this [~galenwarren] 

We should also make sure that this ends up in the release notes; do you think 
we should add it to this ticket or do you think we should add it to 
https://issues.apache.org/jira/browse/FLINK-25577 ?

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.15.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-08-28 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406351#comment-17406351
 ] 

Xintong Song commented on FLINK-11838:
--

[~laughingman7743]
Unfortunately, this feature is not finished and will no be presented in release 
1.14.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, stale-assigned, usability
> Fix For: 1.15.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-08-28 Thread Tomoyuki NAKAMURA (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406216#comment-17406216
 ] 

Tomoyuki NAKAMURA commented on FLINK-11838:
---

Is there any progress on this ticket, it looks like flink-1.14.0-rc0 has been 
released.

https://github.com/apache/flink/releases/tag/release-1.14.0-rc0

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, stale-assigned, usability
> Fix For: 1.14.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-08-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398108#comment-17398108
 ] 

Galen Warren commented on FLINK-11838:
--

I know this has taken a long time. I'll have the code in before the upcoming 
1.14 freeze.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, stale-assigned, usability
> Fix For: 1.14.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-21 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326990#comment-17326990
 ] 

Galen Warren commented on FLINK-11838:
--

PR is still active.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, stale-assigned, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-21 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326986#comment-17326986
 ] 

Flink Jira Bot commented on FLINK-11838:


This issue is assigned but has not received an update in 7 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, stale-assigned, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-13 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320638#comment-17320638
 ] 

Xintong Song commented on FLINK-11838:
--

Thanks [~galenwarren]. I'll try to take a look asap.

Just to managed expectation, this could take some time. We are currently 
prioritizing the 1.13.0 release testing works.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-13 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17320363#comment-17320363
 ] 

Galen Warren commented on FLINK-11838:
--

[~xintongsong],

PR created: [https://github.com/apache/flink/pull/15599]

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-07 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316376#comment-17316376
 ] 

Galen Warren commented on FLINK-11838:
--

Working on the new PR now ... coming soon.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-02 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314161#comment-17314161
 ] 

Xintong Song commented on FLINK-11838:
--

Thanks for the update, [~galenwarren].

Either updating the old PR or starting a new one works for me. 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-02 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314079#comment-17314079
 ] 

Galen Warren commented on FLINK-11838:
--

[~xintongsong] – actually, is that the right thing to do, start a new PR? I'm 
thinking that might be the cleanest way to proceed, since we never really used 
the old one that I created prematurely, and the info in the old one is out of 
date, but I'll follow your guidance here on what to do.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-02 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314067#comment-17314067
 ] 

Galen Warren commented on FLINK-11838:
--

Hi [~xintongsong] – sorry for the long delay. I plan to create a new PR and 
upload some code this weekend, my other work has let up a bit and I have some 
time to look at this. I'll post the link here when it's ready.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-04-02 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17313642#comment-17313642
 ] 

Xintong Song commented on FLINK-11838:
--

Hi [~galenwarren], any updates on this ticket?

I'd like to kindly inform you that the Apache Flink community has decided to 
adopt a new Jira process [1], according to which assigned tickets need a 
regular status update. Tickets will be labeled "stale-assigned" in 7 days 
without status updates, and unassigned in another 7 days. There will soon be a 
bot to enforce that.

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Jira-Process-amp-Bot-td49676.html#a49734

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-17 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303537#comment-17303537
 ] 

Galen Warren commented on FLINK-11838:
--

Not that I'm aware of. Writing checkpoints/savepoints and HA data to GCS 
buckets works fine as-is, but to use StreamingFileSink, a RecoverableWriter 
implementation is required. That exists now for S3 but not GCS.

I have a local GCS RecoverableWriter implementation I'm using and which seems 
to be working, and I'm hoping to push the first part of the code for review by 
[~xintongsong] this weekend.

 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302382#comment-17302382
 ] 

Berkay Öztürk commented on FLINK-11838:
---

Are there any other way of writing streaming data into a GCS bucket with Flink? 
Or is it required to have a RecoverableWriter implementation?

Using StreamingFileSink (or FileSink) gives this error:
{code:java}
 java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are 
only supported for HDFS
{code}

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300332#comment-17300332
 ] 

Galen Warren commented on FLINK-11838:
--

[~BerkayOzturk] No, it's not in a working state. Since that PR, [~xintongsong] 
and I refined the proposed implementation significantly. I'm hoping to get back 
to this very soon, I've had some work stuff come up that is taking all my time. 
Sorry about the delay!

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-12 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300157#comment-17300157
 ] 

Berkay Öztürk commented on FLINK-11838:
---

Hello [~galenwarren],

I currently need a GCS connector implementation for work. Is your [latest 
commit in 
[add-gcs-filesystem-with-recoverable-writer-2|https://github.com/coachclientconnect/flink/compare/add-gcs-filesystem-with-recoverable-writer-2]
 
brach|[https://github.com/coachclientconnect/flink/commit/e5e69b67c74e8a7576b3f550f3bb79defacaac6d]]
 in working state? Or should I (and others who need this) wait for your update 
on the PR?

Thank you

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-12 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300156#comment-17300156
 ] 

Berkay Öztürk commented on FLINK-11838:
---

Hello [~galenwarren],

I currently need a GCS connector implementation for work right now. Is your 
[latest commit in 
[add-gcs-filesystem-with-recoverable-writer-2|https://github.com/coachclientconnect/flink/compare/add-gcs-filesystem-with-recoverable-writer-2]
 
brach|[https://github.com/coachclientconnect/flink/commit/e5e69b67c74e8a7576b3f550f3bb79defacaac6d]]
 in working state? Or should I (and others who need this) wait for your update 
on the PR?

Thank you

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-04 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17295702#comment-17295702
 ] 

Xintong Song commented on FLINK-11838:
--

Thanks for the update, [~galenwarren]. No worries. Good luck with your work 
project :)

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-03-04 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17295384#comment-17295384
 ] 

Galen Warren commented on FLINK-11838:
--

I just wanted to add an update here so this doesn't go stale – I should be able 
to push the first piece of the code in a PR in about a week, once this work 
project I'm on dies down a bit. So this is still coming, sorry for the delay.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-22 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288764#comment-17288764
 ] 

Xintong Song commented on FLINK-11838:
--

Thanks for the updates, [~galenwarren]. No worries, take your time.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-22 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288419#comment-17288419
 ] 

Galen Warren commented on FLINK-11838:
--

Hey, I just wanted to check in here. Sorry for the delay, some other work stuff 
has come up and I haven't been able to spend any time here. But I will still 
complete this, will hopefully get back to it toward the end of this week.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284896#comment-17284896
 ] 

Galen Warren commented on FLINK-11838:
--

Ah, OK – I see. It will take me a couple of days to get things organized, I'll 
post back here when I have something for you to look at. Thanks.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-15 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284771#comment-17284771
 ] 

Xintong Song commented on FLINK-11838:
--

[~galenwarren],

What makes the original PR not easy to review is not the size of the PR, but 
all the changes are put in a single commit. It would be helpful to break the PR 
into multiple commits, each contains a smaller group of closely related changes.

Please find detailed guidelines and examples 
[here|https://flink.apache.org/contributing/code-style-and-quality-pull-requests.html].

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-15 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284751#comment-17284751
 ] 

Galen Warren commented on FLINK-11838:
--

That's fine on "compose on persist", I'll leave it out for now and supply it in 
a separate commit if we decide to go there.

This reminds me of a question I meant to ask, though. When we first started, 
you had some concern about the size of the original PR and suggested it be 
broken into parts. The new PR will be roughly the same size – if that's too 
big, any suggestion on how to break it into pieces? Should the unit tests be a 
separate commit?

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-14 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284518#comment-17284518
 ] 

Xintong Song commented on FLINK-11838:
--

Thanks [~galenwarren], I think we are good to move forward to the PR review.

Concerning "compose on persist", I'm leaning towards keeping it simple & stupid 
until we come up with a thorough plan. My concern for throwing something "not 
obvious which is better" to the users as a configuration option is that, we 
don't known when later we have a thorough plan whether this option will be 
needed by (or worse, conflict with) the plan. Configuration options are 
considered public interfaces, thus incompatible changes (e.g., removing an 
exist option) are not forbidden but would be good to avoid.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-14 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284476#comment-17284476
 ] 

Galen Warren commented on FLINK-11838:
--

{quote}Maybe for the first step, it's good enough to simply do all the 
composing and deleting at the end. We can try to optimize it later if we indeed 
see a performance problem in composing and deleting the temporary blobs.
{quote}
I'm fine to go either way here. I've already put something together locally 
that allows for composing both at persist and commit, but it would be simple to 
revert to just doing it at commit. Maybe you can take a look when we get to the 
code phase to see what you think? If it's not obvious which is better, I 
suppose we could also control that – "compose on persist" – via a Flink option.

Are you comfortable with the approach now? If so, I'll work on getting the code 
together in order to update the PR.

Thanks!

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-13 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17284339#comment-17284339
 ] 

Xintong Song commented on FLINK-11838:
--

bq. perhaps we expose this chunk size as a Flink option to give the user some 
control over the process

Sounds good to me.

bq. it seems to me that we can't safely delete any of the temporary blobs along 
way, because it's possible that we might restore to a checkpoint prior to some 
or all of the incremental compose operations having occurred

That's a good point. Seems we have to keep the temporary blobs anyway, and 
composing blobs in advance only serves as a performance optimization at the 
cost of more temporary storage space and system complexity.

Maybe for the first step, it's good enough to simply do all the composing and 
deleting at the end. We can try to optimize it later if we indeed see a 
performance problem in composing and deleting the temporary blobs.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283950#comment-17283950
 ] 

Galen Warren commented on FLINK-11838:
--

One more thought/question. While we can potentially compose temp blobs along 
the way, to avoid having to compose them all at commit time, it seems to me 
that we can't safely delete any of the temporary blobs along way, because it's 
possible that we might restore to a checkpoint prior to some or all of the 
incremental compose operations having occurred. In that case, we'd have to 
repeat the compose operations, which means the underlying temp blobs would need 
to be there.

If that's right, then we'd necessarily have to wait to the end to delete the 
temp blobs. I was wondering, would it be allowable to do any of those delete 
operations in parallel? The coding 
[guidelines|https://flink.apache.org/contributing/code-style-and-quality-common.html#concurrency-and-threading]
 would seem to discourage this, but they don't outright prohibit it, so I 
thought I'd ask first.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-12 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283747#comment-17283747
 ] 

Galen Warren commented on FLINK-11838:
--

Hi [~xintongsong], sorry for the delay here, I had some other, unrelated work I 
had to focus on.

I like your idea of using {{WriteChannel}} for the uploads, but to close each 
one when {{RecoverableFsDataOutputStream.persist}} is called, so that we're not 
relying on {{WriteChannel}} for recoverability. {{WriteChannel}} allows one to 
control how frequently it flushes data via 
[setChunkSize|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-],
 perhaps we expose this chunk size as a Flink option to give the user some 
control over the process, i.e. how much memory is used for buffering? It could 
be optional, not setting it would mean to use the Google default.

Yes, it would be straightforward to compose blobs at any point in the process, 
i.e. on commit and/or at {{persist}} calls along the way. Maybe we compose them 
on commit (of course) and also whenever {{persist}} is called when there are at 
least 32 temp blobs to be composed? That way, we spread the compose work out 
over time but also call {{compose}} as few times as possible, composing as many 
blobs as possible in each call, which seems like it would be the most efficient.

Shall we go with this approach?

 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-09 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282195#comment-17282195
 ] 

Xintong Song commented on FLINK-11838:
--

Well, I guess I see the divergence here.

So switching to {{normal upload}} means we need to upload all data of a blob at 
once. It does not work like keeping writing data to the blob in a session. That 
is why we need to buffer the data in memory until we want to complete a blob.

What if we keep using the resumable uploading for its session uploading, but do 
not rely on its recoverability? For each 
{{RecoverableFsDataOutputStream#persist}}, we complete a temporary blob which 
we can recover from, and start a new one if needed. And on committing (or in 
advance), we {{compose}} the temporary blobs into the persist file. And we do 
not need to buffer many data in memory.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-09 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282191#comment-17282191
 ] 

Xintong Song commented on FLINK-11838:
--

Thanks [~galenwarren],

h4. Java Serialization

According to the veterans, the biggest concern for java serialization is indeed 
the compatibility issue. Relying on java serialization of classes provided by 
the GCS java library means that, whenever we bump the version of the library, 
we may break the checkpoint/savepoint compatibility if the library contains 
incompatible changes. And from what I understand, GCS is not designed to 
support resumable uploading cross versions, which means they may not pay 
attention to try reduce the incompatible changes.

h4. Switching to normal upload

In general, I think the one-week limitation and the lack of compatibility 
commitment both suggest that, GCS resumable uploading is not designed the long 
period or cross version use cases, like Flink's checkpoint / savepoint. In that 
sense, I think switching to the normal upload and maintain the recoverability 
ourselves makes sense.

h4. A few questions on the details.

bq. The RecoverableFsDatastream could buffer written data and write temporary 
blobs at various points (flush, sync, persist, close, or even write, if the 
buffered size exceeds some threshold). 

Wouldn't it be good enough to complete a temporary blob (and start a new one is 
necessary) when {{persist}} is called? IIUC, we can still use {{WriteChannel}} 
for uploading a temporary blob, let it decide when to flush buffered data to 
GCS, and close it when {{persist}} is called.

bq. and on commit the temporary files would be combined into one file and 
copied to the final blob location.

An alternative could be combining temporary blobs in advance, e.g., whenever we 
have 32 temporary blobs. I'm not entirely sure if this reduces the workload at 
the final committing if there's large amount of temporary blobs. Maybe we can 
conduct some simple experiments, see how the performance of GCS {{compose}} 
relates to the number and size of objects, before deciding when and how to 
combine the temporary blobs.

bq. In this approach, we'd probably need a Flink option to set the maximum size 
of a temporary blob, which would control how frequently temporary blobs would 
be created. Tuning this would involve a tradeoff between memory usage and 
checkpoint size on the Flink side vs. time to commit the writes via the 
recoverable writer, as I'd expect it to take longer to compose a larger number 
of smaller temporary blobs together into the final blob than it would a smaller 
number of larger temporary blobs.

I'm not sure how the checkpoint size is related. A checkpoint should be 
considered completed only if the temporary blobs are successfully persisted. 
Thus the checkpoint should not contain any buffered non-written data. Maybe we 
should simply relying on the streaming file sink operator and the checkpoint 
mechanism to decide how frequently the data should be persisted / flushed / 
synced, rather than introducing another set of tuning knobs.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-09 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281769#comment-17281769
 ] 

Galen Warren commented on FLINK-11838:
--

Hi [~xintongsong] – a couple more thoughts here.

I think I understand why the code in 
[BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java]
 and 
[BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java]
 is more complicated than I expected; there is a 
[requirement|https://cloud.google.com/storage/docs/performing-resumable-uploads]
 in resumable uploads that all the uploaded chunks (except the last one) be a 
multiple of 256 Kib, and so the writers have to buffer data in order to meet 
that requirement. The RestorableState then, in general, contains some amount of 
buffered data, i.e. data that has been written but that has not yet been sent 
as part of an upload chunk.

Also, I came across a bit of a disturbing 
[thread|https://issuetracker.google.com/issues/137168102] from last year, where 
a GCS bug was being discussed that, essentially, caused in-progress resumable 
uploads to disappear and return 410 GONE errors. Such a failure would obviously 
be a big problem for the Flink use case we're talking about here. That thread 
claims this bug is fixed as of July 2020; however, it got me thinking about 
alternative implementations, especially since they provided this guidance:
{quote}To retry successfully, catching 500 and 410 errors is required and, as 
the official documentation recommends 
([https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone]), 
implementing a retry by starting a new session for the upload that received an 
unsuccessful status code but still needs uploading. 
{quote}
 which we would be unable to follow.

So here's a different implementation idea. What if we didn't use resumable 
uploads at all, and instead just uploaded data in a series of temporary blobs, 
each of which would be uploaded via a normal, nonresumable upload. The 
[RecoverableFsDatastream|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]
 could buffer written data and write temporary blobs at various points (flush, 
sync, persist, close, or even write, if the buffered size exceeds some 
threshold). In this approach, the resumable state would include a list of all 
the temporary blobs uploaded associated with this write, and on commit the 
temporary files would be combined into one file and copied to the final blob 
location.

To combine the temp blobs into the final blob, we could use the 
[compose|https://cloud.google.com/storage/docs/composing-objects] feature of 
GCS, which allows combining up to 32 blobs into one in a single call, and which 
could compose an arbitrary number of blobs with a pretty simple algorithm.

A few additional benefits of this approach, potentially:
 * Each temp blob write would be a normal, nonresumable upload with known 
contents and size at the time of upload, so we could use CRC checks as 
recommended [here|https://cloud.google.com/storage/docs/hashes-etags].
 * We'd sidestep the one-week limitation; the lifetime of temporary blobs could 
be made indefinite or managed via bucket policy
 * We would be in full control of the resumable state, so we'd avoid any issues 
related to Java serialization

In this approach, we'd probably need a Flink option to set the maximum size of 
a temporary blob, which would control how frequently temporary blobs would be 
created. Tuning this would involve a tradeoff between memory usage and 
checkpoint size on the Flink side vs. time to commit the writes via the 
recoverable writer, as I'd expect it to take longer to compose a larger number 
of smaller temporary blobs together into the final blob than it would a smaller 
number of larger temporary blobs.

Anyway, just a thought – let me know what you think. And enjoy the New Year! :)

 

 

 

 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compa

[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-08 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281488#comment-17281488
 ] 

Xintong Song commented on FLINK-11838:
--

Thanks for the information.
FYI, the Chinese New Year holidays are from Feb. 11 to 17, during which I could 
be less responsive.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-08 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281481#comment-17281481
 ] 

Galen Warren commented on FLINK-11838:
--

Thanks, that all sounds reasonable. I'll take a look at the REST option. I'm 
going to be a bit tied up with some work-related stuff for the next couple of 
days so I might not get back until later in the week, but I'll definitely get 
back on this soon.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-07 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280730#comment-17280730
 ] 

Xintong Song commented on FLINK-11838:
--

Thanks [~galenwarren].

Your plans regarding the temporary blobs and the resumable upload time limit 
sound good to me.

Concerning the serialization issue, I'm not saying we should go for the REST 
API approach. Just trying to understand what options do we have and their pros 
and cons. TBH, I'm not entirely sure why we had this code quality rule against 
java serialization in the first place. I can see some downsides using java 
serialization, which might not be a big problem in this case. But it would be 
better to understand the original purpose, in case we overlook something.

I'll try to talk to some veterans see if we can find out what's the original 
concerns. I think investigating the REST approach can go concurrently.

Regarding the HTTP/REST client, Netty {{HttpRequest}} is used by some of 
Flink's runtime components. Please be aware that {{flink-shaded-netty}} is 
preferred than depending on Netty directly.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-07 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280587#comment-17280587
 ] 

Galen Warren commented on FLINK-11838:
--

Thanks. All of your initial assumptions are correct, though I might add that it 
is straightforward to serde a RestorableState if Java 
serialization is allowed (i.e. the object is guaranteed to implement 
Serializable). But I understand that's not preferred.

To answer your questions:

*Does "writing a blob to a temporary location" mean that the user always needs 
to configure a temporary location? How is the temporary location cleaned, say 
if they're never moved to the committed location?*

I was thinking to manage temporary locations through a couple of Flink options:
 * First, a "prefix" option, which would be a string to be applied in front of 
the supplied, "final" blob name. This prefix could be default be something like 
".inprogress/".
 * Second, an option for the bucket to use for temp blobs, which would not be 
required. If not supplied, the same bucket would be used for temp blobs as for 
the associated final blobs.

I was also planning on appending a UUID string to the end of temp blob 
locations, in order to guarantee their uniqueness during the temp-blob writing 
phase, in case somehow multiple writes to the same final blob were in progress 
at the same time.

So, with the defaults, a write to a blob {{gs://final_bucket/foo/bar}} would 
use a temp blob location of, say, 
{{gs://final_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}. 
The prefix could be configured via the first option; also, if the user wanted 
to write all temp blobs to a "temp_bucket" bucket, that bucket could be 
specified via the second option, yielding 
{{gs://temp_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}.

As for cleaning the temporary blobs, is that what 
[RecoverableWriter.cleanupRecoverableState|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-]
 does? I was thinking to clean up any temporary blobs there. Beyond that, if a 
separate bucket for temporary blobs were used, I suppose one could apply an 
[object lifecycle-management 
rule|https://cloud.google.com/storage/docs/lifecycle] that would delete blobs 
after some period of time. These rules look to be applicable only at the bucket 
level, so this would only work if a separate bucket were used just for 
temporary blobs.

 

*Per [this doc|https://cloud.google.com/storage/docs/resumable-uploads], a 
resumable upload must be completed within a week. This could be surprising for 
the users, if they try to restore a job from checkpoints/savepoints after 
pausing for more than one week.*

Yes, I would propose to disclose this limitation via a disclaimer, similar to 
the one used for S3:
{quote}Important Note 2: To guarantee exactly-once semantics while being 
efficient, the {{StreamingFileSink}} uses the [Multi-part 
Upload|https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html] 
feature of S3 (MPU from now on). This feature allows to upload files in 
independent chunks (thus the "multi-part") which can be combined into the 
original file when all the parts of the MPU are successfully uploaded. For 
inactive MPUs, S3 supports a bucket lifecycle rule that the user can use to 
abort multipart uploads that don't complete within a specified number of days 
after being initiated. This implies that if you set this rule aggressively and 
take a savepoint with some part-files being not fully uploaded, their 
associated MPUs may time-out before the job is restarted. This will result in 
your job not being able to restore from that savepoint as the pending 
part-files are no longer there and Flink will fail with an exception as it 
tries to fetch them and fails.
{quote}
Admittedly, it does seem that S3 provides more configuration options here than 
GCS. It would be nice if the week limit were configurable, but it doesn't seem 
to be, based on my read.

 

*Relying on Java serialization means depending our compatibility on the 
compatibility of GCS, which should be avoid if possible. Would it be possible 
to directly work with the REST API and session URI? IIUC this is how the write 
channel internally works.*

I'd need to look into it more closely, but yes, I think this could be possible. 
I think we'd wind up reimplementing much of what is done in 
[BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java]
 and 
[BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java],
 which I suppose I was thinking would be good to avoid. The code in the 
BlobWrite

[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-07 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280454#comment-17280454
 ] 

Xintong Song commented on FLINK-11838:
--

[~galenwarren],

Thanks for explaining the design details. This is exactly what I'm looking for.

I'm trying to understand the reasons behind some of the design choices. Please 
confirm if I understand it correctly, or correct me if otherwise.
* The design proposes to build {{GCSFileSystem}} on top of 
{{HadoopFileSystem}}. This is because we can reuse most part of the 
{{HadoopFileSystem}} implementation, leveraging the GCS-provided 
{{GoogleHadoopFileSystem}}.
* For the {{RecoverableWriter}}, we cannot reuse {{HadoopRecoverableWriter}}, 
because:
** {{HadoopRecoverableWriter}} checks the schema and Hadoop version
** {{HadoopRecoverableWriter}} assumes files can be appended, which is true for 
files on HDFS but not for immutable objects on GCS.
* The design proposes to leverage the GCS resumable upload feature.
** The feature allows capturing *write state* during writing the object and 
resume writing by restoring the captured *write state*.
** Both capturing and restoring should happen before the object is completely 
written (thus not visible for reading), and once the writing is completed the 
object becomes immutable.
** To use this feature, we need to persist an object 
({{RestorableState}}) generated by {{capture()}}, which will be 
used for {{restore()}} later. However, the implementation of 
{{RestorableState}} is internal to GCS, and we do not have a good 
way to serialize/deserialize it.

If my understanding is correct, then I have a few questions.
# Does "writing a blob to a temporary location" mean that the user always needs 
to configure a temporary location? How is the temporary location cleaned, say 
if they're never moved to the committed location?
# Per [this doc|https://cloud.google.com/storage/docs/resumable-uploads], a 
resumable upload must be completed within a week. This could be surprising for 
the users, if they try to restore a job from checkpoints/savepoints after 
pausing for more than one week.
# Relying on Java serialization means depending our compatibility on the 
compatibility of GCS, which should be avoid if possible. Would it be possible 
to directly work with the REST API and session URI? IIUC this is how the write 
channel internally works.


> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available, usability
> Fix For: 1.13.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-05 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279694#comment-17279694
 ] 

Galen Warren commented on FLINK-11838:
--

Thanks [~xtsong]! I really appreciate your help on this. Sorry if I jumped the 
gun on the PR, I'm happy to follow the process you've outlined, and if it makes 
more sense to ultimately submit the PR in multiple pieces instead of one, 
that's fine with me too.

So it would seem that the next step would be to discuss the proposed design 
here. I'll take a crack at that.



At a high level, the goal here is to implement RecoverableWriter for GCS, in 
order to be able to use StreamingFileSink to write to GCS. In Flink, 
recoverable writers are created by calling 
[FileSystem.createRecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html#createRecoverableWriter--],
 so we will also need an implementation of org.apache.flink.core.fs.FileSystem 
for GCS in order to expose the recoverable writer to Flink.

The org.apache.flink.core.fs.FileSystem implementation is straightforward. 
Google provides an implementation of org.apache.hadoop.fs.FileSystem for GCS 
via 
[GoogleHadoopFileSystem|https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java],
 which can already be used in Flink in other contexts, e.g. for checkpoints. 
Flink provides 
[HadoopFileSystem|https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html],
 which implements org.apache.flink.core.fs.FileSystem in terms of an underlying 
org.apache.hadoop.fs.FileSystem. So, the proposed FileSystem for GCS would 
extend HadoopFileSystem, constructing it with an instance of 
GoogleHadoopFileSystem. This new FileSystem class would also override 
createRecoverableWriter to return a RecoverableWriter implementation for GCS. 

We also need an implementation of 
[FileSystemFactory|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystemFactory.html]
 and register it with SPI to expose the GCS FileSystem to Flink.

So, next is the interesting part – the GCS 
[RecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
 implementation. At a high level, RecoverableWriter allows one to create an 
output stream, write bytes to that stream, persist and recover the state of 
that stream, and ultimately commit the write operation or abort it. In GCS, I 
propose to do recoverable writes in two steps:
 * First, write a blob to a temporary location using the [resumable 
uploads|https://cloud.google.com/storage/docs/resumable-uploads] feature of 
GCS. This allows for blobs to be written in chunks over a potentially long 
period of time (up to one week, per the docs). Importantly, it also allows for 
the write state to be persisted and recovered, via 
[WriteChannel.capture|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#capture--]
 and 
[RestorableState.restore|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/RestorableState.html],
 which we'll use to implement the persist/recover functionality in 
RecoverableWriter.
 * Second, commit the write operation by copying the temporary blob to the 
"final" blob location, i.e. the one specified in the initial call to 
RecoverableWriter.open, and deleting the temporary blob. In the event of an 
aborted upload, the cleanup would consist of deleting just the temporary blob.

So, in this approach, the recoverable state from Flink's perspective (i.e. 
CommitRecoverable and ResumeRecoverable) would include:
 * The RestorableState returned from WriteChannel
 * The write position (we have to keep track of this ourselves, because 
WriteChannel doesn't expose this)
 * The locations of the temporary and final blobs, so that we can ultimately 
commit or cleanup the overall operation

That's basically it at a high level.

I do want to point out one possible conflict with the Flink coding guidelines, 
though, to get your thoughts. The guidelines say – very emphatically! –  not to 
use Java serialization for anything. In the GCS case, the RestorableState that 
is returned from WriteChannel.capture is an object that implements Serializable 
but is otherwise opaque. This object does need to be serialized/deserialized as 
part of the RecoverableWriter implementation, and it's not clear to me how to 
do that except by using Java serialization.



I'll stop there for now, please let me know if this is the sort of information 
you're looking for here. I'm happy to drill into any area in more detail if 
you'd like, jus

[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-04 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279299#comment-17279299
 ] 

Xintong Song commented on FLINK-11838:
--

Hi [~galenwarren],

Thanks for offering the contribution. I will help you with this contribution.

Since this ticket has not been updated for quite some time and the original PR 
has been abandoned, I have assigned you to the ticket.

Just to managed expectation, I could use some time to pick up the GCS 
backgrounds and review your design proposal.

During this time, I would suggest to take a look at the following guidelines.
 [https://flink.apache.org/contributing/contribute-code.html]
 [https://flink.apache.org/contributing/code-style-and-quality-preamble.html]

After a first glance at the PR, I've two suggestions.
- I noticed you've described your proposal on the PR you've opened. It would be 
nice to update it to the description of this JIRA ticket. Usually, we use the 
JIRA ticket for design discussions, and the PR for reviewing implementation 
details.
- The PR contains 3k LOC changes, in a single commit, which could be hard to 
review, especially when we cannot communicate face-to-face. It would be nice to 
organize the codes into smaller commits following the contribution guidelines. 
This can be done after we reach consensus on the design proposal.

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Galen Warren
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2021-02-04 Thread Galen Warren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279023#comment-17279023
 ] 

Galen Warren commented on FLINK-11838:
--

I have submitted a new [PR|https://github.com/apache/flink/pull/14875] for 
this, unrelated to the prior ones, and [~trohrmann] suggested I reuse this Jira 
ticket.

 

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11838) Create RecoverableWriter for GCS

2020-06-06 Thread Tomoyuki NAKAMURA (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17127324#comment-17127324
 ] 

Tomoyuki NAKAMURA commented on FLINK-11838:
---

Is there any progress on this ticket?

> Create RecoverableWriter for GCS
> 
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)