[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Simon-Shlomo Poil updated FLINK-34696:
--------------------------------------
    Description: 
*Description:*

The `composeBlobs` method in 
`org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
merge multiple small blobs into a single large blob using Google Cloud 
Storage's compose method. This process is iterative, combining the result from 
the previous iteration with 31 new blobs until all blobs are merged. Upon 
completion of the composition, the method proceeds to remove the temporary 
blobs.

*Issue:*

This methodology results in significant, unnecessary data storage consumption 
during the blob composition process, incurring considerable costs due to Google 
Cloud Storage pricing models.

*Example to Illustrate the Problem:*

- Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
- After 1st step: 32 blobs are merged into a single blob, increasing total 
storage to 96 GB (64 original + 32 GB new).
- After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
raising the total to 159 GB.
- After 3rd step: The final blob is merged, culminating in a total of 223 GB to 
combine the original 64 GB of data. This results in an overhead of 159 GB.

*Impact:*

This inefficiency has a profound impact, especially at scale, where terabytes 
of data can incur overheads in the petabyte range, leading to unexpectedly high 
costs. Additionally, we have observed an increase in storage exceptions thrown 
by the Google Storage library, potentially linked to this issue.

*Suggested Solution:*

To mitigate this problem, we propose modifying the `composeBlobs` method to 
immediately delete source blobs once they have been successfully combined. This 
change could significantly reduce data duplication and associated costs. 
However, the implications for data recovery and integrity need careful 
consideration to ensure that this optimization does not compromise the ability 
to recover data in case of a failure during the composition process.

*Steps to Reproduce:*

1. Initiate the blob composition process in an environment with a significant 
number of blobs (e.g., 64 blobs of 1 GB each).
2. Observe the temporary increase in data storage as blobs are iteratively 
combined.
3. Note the final amount of data storage used compared to the initial total 
size of the blobs.

*Expected Behavior:*

The blob composition process should minimize unnecessary data storage use, 
efficiently managing resources to combine blobs without generating excessive 
temporary data overhead.

*Actual Behavior:*

The current implementation results in significant temporary increases in data 
storage, leading to high costs and potential system instability due to frequent 
storage exceptions.
 
 
 

  was:
In the "composeBlobs" method of 
org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter
many small blobs are combined to generate a final single blob using the google 
storage compose method. This compose action is performed iteratively each time 
composing  the resulting blob from the previous step with 31 new blobs until 
there are not remaining blobs. When the compose action is completed the 
temporary blobs are removed.
 
This unfortunately leads to significant excessive use of data storage (which 
for google storage is a rather costly situation). 
 
*Simple example*
We have 64 blobs each 1 GB; i.e. 64 GB
1st step: 32 blobs are composed into one blob; i.e. now 64 GB + 32 GB = 96 GB
2nd step: The 32 GB blob from previous step is composed with 31 blobs; now we 
have 64 GB + 32 GB + 63 GB = 159 GB
3rd step: The last remaining blob is composed with the blob from the previous 
step; now we have: 64 GB + 32 GB + 63 GB + 64 GB = 223 GB
I.e. in order to combine 64 GB of data we had an overhead of 159 GB. 
 
*Why is this big issue?*
With large amount of data the overhead becomes significant. With TiB of data we 
experienced peaks of PiB leading to unexpected high costs, and (maybe 
unrelated) frequent storage exceptions thrown by the Google Storage library.
 
*Suggested solution:* 
When the blobs are composed together they should be deleted to not duplicate 
data.
Maybe this has implications for recoverability?
 
 
 
 
 


> GSRecoverableWriterCommitter is generating excessive data blobs
> ---------------------------------------------------------------
>
>                 Key: FLINK-34696
>                 URL: https://issues.apache.org/jira/browse/FLINK-34696
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>            Reporter: Simon-Shlomo Poil
>            Priority: Major
>
> *Description:*
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
> - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
> - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
> - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
> - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to