[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280587#comment-17280587
 ] 

Galen Warren commented on FLINK-11838:
--------------------------------------

Thanks. All of your initial assumptions are correct, though I might add that it 
is straightforward to serde a RestorableState<WriteChannel> if Java 
serialization is allowed (i.e. the object is guaranteed to implement 
Serializable). But I understand that's not preferred.

To answer your questions:

*Does "writing a blob to a temporary location" mean that the user always needs 
to configure a temporary location? How is the temporary location cleaned, say 
if they're never moved to the committed location?*

I was thinking to manage temporary locations through a couple of Flink options:
 * First, a "prefix" option, which would be a string to be applied in front of 
the supplied, "final" blob name. This prefix could be default be something like 
".inprogress/".
 * Second, an option for the bucket to use for temp blobs, which would not be 
required. If not supplied, the same bucket would be used for temp blobs as for 
the associated final blobs.

I was also planning on appending a UUID string to the end of temp blob 
locations, in order to guarantee their uniqueness during the temp-blob writing 
phase, in case somehow multiple writes to the same final blob were in progress 
at the same time.

So, with the defaults, a write to a blob {{gs://final_bucket/foo/bar}} would 
use a temp blob location of, say, 
{{gs://final_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}. 
The prefix could be configured via the first option; also, if the user wanted 
to write all temp blobs to a "temp_bucket" bucket, that bucket could be 
specified via the second option, yielding 
{{gs://temp_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}.

As for cleaning the temporary blobs, is that what 
[RecoverableWriter.cleanupRecoverableState|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-]
 does? I was thinking to clean up any temporary blobs there. Beyond that, if a 
separate bucket for temporary blobs were used, I suppose one could apply an 
[object lifecycle-management 
rule|https://cloud.google.com/storage/docs/lifecycle] that would delete blobs 
after some period of time. These rules look to be applicable only at the bucket 
level, so this would only work if a separate bucket were used just for 
temporary blobs.

 

*Per [this doc|https://cloud.google.com/storage/docs/resumable-uploads], a 
resumable upload must be completed within a week. This could be surprising for 
the users, if they try to restore a job from checkpoints/savepoints after 
pausing for more than one week.*

Yes, I would propose to disclose this limitation via a disclaimer, similar to 
the one used for S3:
{quote}Important Note 2: To guarantee exactly-once semantics while being 
efficient, the {{StreamingFileSink}} uses the [Multi-part 
Upload|https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html] 
feature of S3 (MPU from now on). This feature allows to upload files in 
independent chunks (thus the "multi-part") which can be combined into the 
original file when all the parts of the MPU are successfully uploaded. For 
inactive MPUs, S3 supports a bucket lifecycle rule that the user can use to 
abort multipart uploads that don't complete within a specified number of days 
after being initiated. This implies that if you set this rule aggressively and 
take a savepoint with some part-files being not fully uploaded, their 
associated MPUs may time-out before the job is restarted. This will result in 
your job not being able to restore from that savepoint as the pending 
part-files are no longer there and Flink will fail with an exception as it 
tries to fetch them and fails.
{quote}
Admittedly, it does seem that S3 provides more configuration options here than 
GCS. It would be nice if the week limit were configurable, but it doesn't seem 
to be, based on my read.

 

*Relying on Java serialization means depending our compatibility on the 
compatibility of GCS, which should be avoid if possible. Would it be possible 
to directly work with the REST API and session URI? IIUC this is how the write 
channel internally works.*

I'd need to look into it more closely, but yes, I think this could be possible. 
I think we'd wind up reimplementing much of what is done in 
[BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java]
 and 
[BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java],
 which I suppose I was thinking would be good to avoid. The code in the 
BlobWriteChannel regarding retries/flushing looks a bit complicated, but 
perhaps we wouldn't need all that complication in the Flink case?

In the event we were to go this route, is there a preferred client to use in 
Flink for HTTP/REST requests?

 

 

 

 

> Create RecoverableWriter for GCS
> --------------------------------
>
>                 Key: FLINK-11838
>                 URL: https://issues.apache.org/jira/browse/FLINK-11838
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / FileSystem
>    Affects Versions: 1.8.0
>            Reporter: Fokko Driesprong
>            Assignee: Galen Warren
>            Priority: Major
>              Labels: pull-request-available, usability
>             Fix For: 1.13.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to