[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828412#comment-17828412
 ] 

Galen Warren edited comment on FLINK-34696 at 3/19/24 5:06 PM:
---------------------------------------------------------------

I think your proposed algorithm is just another way of implementing the "delete 
intermediate blobs that are no longer necessary as soon as possible" idea we've 
considered. You accomplish it by overwriting the staging blob on each 
iteration; a similar effect could be achieved by writing to a new intermediate 
staging blob on each iteration (as is done now) and deleting the old one right 
away (which is not done now. instead this deletion occurs shortly thereafter, 
after the commit succeeds).

Aside: I'm not sure whether it's possible to overwrite the existing staging 
blob like you suggest. The [docs |[Objects: compose  |  Cloud Storage  |  
Google 
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for 
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same 
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose 
operation and one of the input blobs to be composed. That _might_ work, but 
would have to be tested to see if it's allowed. If it isn't, writing to a new 
blob and deleting the old one would have essentially the same effect. That's 
almost certainly what happens behind the scenes anyway, since blobs are 
immutable.

Bigger picture, if you're trying to combine millions of immutable blobs 
together in one step, 32 at a time, I don't see how you avoid having lots of 
intermediate composed blobs, one way or another, at least temporarily. The main 
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update 
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already 
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input 
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. 
uncomposed) temporary blobs that need to be composed and committed. If you 
removed those raw blobs from that list as they're added to the staging blob, 
what would that accomplish? The GSCommitRecoverable provided to the 
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds 
or fails, and if the commit were to be retried it would need the complete list 
anyway.
{quote}I notice an other issue with the code - because the storage.compose 
might throw a StorageException. With the current code this would mean the 
intermediate composition blobs are not cleaned up. 

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all 
failure scenarios. Even in your proposed algorithm, the staging blob could be 
orphaned if the composition process failed partway through. This is what the 
temp bucket and TTL mechanism are for. It's optional, so you don't have to use 
it, but yes you would have to keep an eye out for orphaned intermediate blobs 
via some other mechanism f you choose not to use it.

Honestly, I think some of your difficulties are coming from trying to combine 
so much data together at once vs. doing it along the way, with commits at 
incremental checkpoints. I was going to suggest you reduce your checkpoint 
interval, to aggregate more frequently, but obviously that doesn't work if 
you're not checkpointing at all.

I do think the RecoverableWriter mechanism is designed to make writing 
predictable and repeatable in checkpoint/recovery situations (hence the name); 
if you're not doing any of that, you may run into some challenges. If you were 
to use checkpoints, you would aggregate more frequently, meaning fewer 
intermediate blobs with shorter lifetimes, and you'd be less likely to run up 
against the 5TB limit, as you would end up with a series of smaller files, 
written at each checkpoint, vs. one huge file.

If we do want to consider changes, here's what I suggest:
 * Test whether GCP allows composition to overwrite an existing blob that is 
also an input to the compose operation. If that works, then we could use that 
technique in the composeBlobs function. As mentioned above, I doubt that this 
actually reduces the number of blobs that exist temporarily – since blobs are 
immutable objects – but it would minimize their lifetime, effectively deleting 
them as soon as possible. If overwriting doesn't work, then we could achieve a 
similar effect by deleting intermediate blobs as soon as they are not needed 
anymore, rather than waiting until the commit succeeds.

 
 * Investigate how other RecoverableWriter implementations handle blob-size 
limitations. For example, S3 has the same 5TB limit. As we've discussed, there 
would be ways to write multiple files in such scenarios, but we'd have to give 
up a bit of atomicity that is assumed in the design. I'm not really sure 
whether that would be allowed or if the answer is "commit frequently enough to 
keep the size of final blobs under 5TB." Someone else from the Flink team would 
have to weigh in on that.

 
 * Consider a change to the composing algorithm that I'll describe below, which 
optimizes for total bytes written/composed vs. number of compose operations.

 

Possible alternate algorithm:

Consider the case where there are 125 raw temporary blobs to be aggregated, 
each of size 1GB. The current method (and your proposed method) goes like this:
 * Combine the first 32 blobs into 1 blob – 1 compose operation, 32GB written 
into a new blob
 * Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 32GB 
= 63GB written into a new blob
 * Combine that blob with the next 31 blobs - 1 compose operation, 31GB + 63GB 
= 94GB written into a new blob
 * Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 94GB 
= 125GB written into a new blob

So, a total of 4 compose operations and 32GB + 63GB + 94GB  + 125GB = 314GB 
written/composed.

Alternately, we could do this:
 * Combine the first 32 blobs into 1 intermediate blob – 1 compose operation, 
32GB written into a new blob
 * Combine the second 32 blobs into 1 intermediate blob – 1 compose operation 
32GB written into a new blob
 * Combine the third 32 blob into 1 intermediate blob – 1 compose operation, 
32GB written into a new blob
 * Combine the last 29 blobs into 1 intermediate blob – 1 compose operation, 
29GB written into a new blob
 * Combine the 4 intermediate blobs into one final blob – 1 compose operation, 
32GB + 32GB + 32GB + 29GB = 125GB written into a new blob

This method has 5 compose operations – one more than the existing way – but 
only writes a total of 32GB + 32GB + 32GB + 29GB + 125GB = 250GB. So, more 
compose operations but less intermediate data and total data written. 

EDIT: Assuming I did my math right: If N is the total number of raw temporary 
blobs to aggregate, and they're all the same size, the total volume of data 
written in the first algorithm (existing) is approximately proportional to N^2 
as compared to N * log N for the second algorithm. My guess is that would have 
the biggest impact in high-volume scenarios. Overwriting the same staging blob 
would be moot in this case. We could still delete intermediate blobs along the 
way, just not the raw temporary blobs, which have to be retained.

 


was (Author: galenwarren):
I think your proposed algorithm is just another way of implementing the "delete 
intermediate blobs that are no longer necessary as soon as possible" idea we've 
considered. You accomplish it by overwriting the staging blob on each 
iteration; a similar effect could be achieved by writing to a new intermediate 
staging blob on each iteration (as is done now) and deleting the old one right 
away (which is not done now. instead this deletion occurs shortly thereafter, 
after the commit succeeds).

Aside: I'm not sure whether it's possible to overwrite the existing staging 
blob like you suggest. The [docs |[Objects: compose  |  Cloud Storage  |  
Google 
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for 
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same 
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose 
operation and one of the input blobs to be composed. That _might_ work, but 
would have to be tested to see if it's allowed. If it isn't, writing to a new 
blob and deleting the old one would have essentially the same effect. That's 
almost certainly what happens behind the scenes anyway, since blobs are 
immutable.

Bigger picture, if you're trying to combine millions of immutable blobs 
together in one step, 32 at a time, I don't see how you avoid having lots of 
intermediate composed blobs, one way or another, at least temporarily. The main 
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update 
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already 
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input 
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. 
uncomposed) temporary blobs that need to be composed and committed. If you 
removed those raw blobs from that list as they're added to the staging blob, 
what would that accomplish? The GSCommitRecoverable provided to the 
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds 
or fails, and if the commit were to be retried it would need the complete list 
anyway.
{quote}I notice an other issue with the code - because the storage.compose 
might throw a StorageException. With the current code this would mean the 
intermediate composition blobs are not cleaned up. 

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all 
failure scenarios. Even in your proposed algorithm, the staging blob could be 
orphaned if the composition process failed partway through. This is what the 
temp bucket and TTL mechanism are for. It's optional, so you don't have to use 
it, but yes you would have to keep an eye out for orphaned intermediate blobs 
via some other mechanism f you choose not to use it.

Honestly, I think some of your difficulties are coming from trying to combine 
so much data together at once vs. doing it along the way, with commits at 
incremental checkpoints. I was going to suggest you reduce your checkpoint 
interval, to aggregate more frequently, but obviously that doesn't work if 
you're not checkpointing at all.

I do think the RecoverableWriter mechanism is designed to make writing 
predictable and repeatable in checkpoint/recovery situations (hence the name); 
if you're not doing any of that, you may run into some challenges. If you were 
to use checkpoints, you would aggregate more frequently, meaning fewer 
intermediate blobs with shorter lifetimes, and you'd be less likely to run up 
against the 5TB limit, as you would end up with a series of smaller files, 
written at each checkpoint, vs. one huge file.

If we do want to consider changes, here's what I suggest:
 * Test whether GCP allows composition to overwrite an existing blob that is 
also an input to the compose operation. If that works, then we could use that 
technique in the composeBlobs function. As mentioned above, I doubt that this 
actually reduces the number of blobs that exist temporarily – since blobs are 
immutable objects – but it would minimize their lifetime, effectively deleting 
them as soon as possible. If overwriting doesn't work, then we could achieve a 
similar effect by deleting intermediate blobs as soon as they are not needed 
anymore, rather than waiting until the commit succeeds.

 
 * Investigate how other RecoverableWriter implementations handle blob-size 
limitations. For example, S3 has the same 5TB limit. As we've discussed, there 
would be ways to write multiple files in such scenarios, but we'd have to give 
up a bit of atomicity that is assumed in the design. I'm not really sure 
whether that would be allowed or if the answer is "commit frequently enough to 
keep the size of final blobs under 5TB." Someone else from the Flink team would 
have to weigh in on that.

 
 * Consider a change to the composing algorithm that I'll describe below, which 
optimizes for total bytes written/composed vs. number of compose operations.

 

Possible alternate algorithm:

Consider the case where there are 125 raw temporary blobs to be aggregated, 
each of size 1GB. The current method (and your proposed method) goes like this:
 * Combine the first 32 blobs into 1 blob – 1 compose operation, 32GB written 
into a new blob
 * Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 32GB 
= 63GB written into a new blob
 * Combine that blob with the next 31 blobs - 1 compose operation, 31GB + 63GB 
= 94GB written into a new blob
 * Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 94GB 
= 125GB written into a new blob

So, a total of 4 compose operations and 32GB + 63GB + 94GB  + 125GB = 314GB 
written/composed.

Alternately, we could do this:
 * Combine the first 32 blobs into 1 intermediate blob – 1 compose operation, 
32GB written into a new blob
 * Combine the second 32 blobs into 1 intermediate blob – 1 compose operation 
32GB written into a new blob
 * Combine the third 32 blob into 1 intermediate blob – 1 compose operation, 
32GB written into a new blob
 * Combine the last 29 blobs into 1 intermediate blob – 1 compose operation, 
29GB written into a new blob
 * Combine the 4 intermediate blobs into one final blob – 1 compose operation, 
32GB + 32GB + 32GB + 29GB = 125GB written into a new blob

This method has 5 compose operations – one more than the existing way – but 
only writes a total of 32GB + 32GB + 32GB + 29GB + 125GB = 250GB. So, more 
compose operations but less intermediate data and total data written. 

 

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---------------------------------------------------------------
>
>                 Key: FLINK-34696
>                 URL: https://issues.apache.org/jira/browse/FLINK-34696
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Simon-Shlomo Poil
>            Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to