[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828412#comment-17828412 ]
Galen Warren edited comment on FLINK-34696 at 3/19/24 5:06 PM: --------------------------------------------------------------- I think your proposed algorithm is just another way of implementing the "delete intermediate blobs that are no longer necessary as soon as possible" idea we've considered. You accomplish it by overwriting the staging blob on each iteration; a similar effect could be achieved by writing to a new intermediate staging blob on each iteration (as is done now) and deleting the old one right away (which is not done now. instead this deletion occurs shortly thereafter, after the commit succeeds). Aside: I'm not sure whether it's possible to overwrite the existing staging blob like you suggest. The [docs |[Objects: compose | Cloud Storage | Google Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for the compose operation say: {quote}Concatenates a list of existing objects into a new object in the same bucket. The existing source objects are unaffected by this operation {quote} In your proposal, the same staging blob is both the target of the compose operation and one of the input blobs to be composed. That _might_ work, but would have to be tested to see if it's allowed. If it isn't, writing to a new blob and deleting the old one would have essentially the same effect. That's almost certainly what happens behind the scenes anyway, since blobs are immutable. Bigger picture, if you're trying to combine millions of immutable blobs together in one step, 32 at a time, I don't see how you avoid having lots of intermediate composed blobs, one way or another, at least temporarily. The main question would be how quickly ones that are no longer needed are discarded. {quote}To streamline this, we could modify {{GSCommitRecoverable}} to update the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already appended to the {{stagingBlob}} {quote} Not quite following you here; the GSCommitRecoverable is provided as an input to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. uncomposed) temporary blobs that need to be composed and committed. If you removed those raw blobs from that list as they're added to the staging blob, what would that accomplish? The GSCommitRecoverable provided to the GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds or fails, and if the commit were to be retried it would need the complete list anyway. {quote}I notice an other issue with the code - because the storage.compose might throw a StorageException. With the current code this would mean the intermediate composition blobs are not cleaned up. If the code above is implemented, I think there is no longer a need the TTL feature since all necessary blobs should be written to a final blob. Any leftover blobs post-job completion would indicate a failed state. {quote} There is no real way to prevent the orphaning of intermediate blobs in all failure scenarios. Even in your proposed algorithm, the staging blob could be orphaned if the composition process failed partway through. This is what the temp bucket and TTL mechanism are for. It's optional, so you don't have to use it, but yes you would have to keep an eye out for orphaned intermediate blobs via some other mechanism f you choose not to use it. Honestly, I think some of your difficulties are coming from trying to combine so much data together at once vs. doing it along the way, with commits at incremental checkpoints. I was going to suggest you reduce your checkpoint interval, to aggregate more frequently, but obviously that doesn't work if you're not checkpointing at all. I do think the RecoverableWriter mechanism is designed to make writing predictable and repeatable in checkpoint/recovery situations (hence the name); if you're not doing any of that, you may run into some challenges. If you were to use checkpoints, you would aggregate more frequently, meaning fewer intermediate blobs with shorter lifetimes, and you'd be less likely to run up against the 5TB limit, as you would end up with a series of smaller files, written at each checkpoint, vs. one huge file. If we do want to consider changes, here's what I suggest: * Test whether GCP allows composition to overwrite an existing blob that is also an input to the compose operation. If that works, then we could use that technique in the composeBlobs function. As mentioned above, I doubt that this actually reduces the number of blobs that exist temporarily – since blobs are immutable objects – but it would minimize their lifetime, effectively deleting them as soon as possible. If overwriting doesn't work, then we could achieve a similar effect by deleting intermediate blobs as soon as they are not needed anymore, rather than waiting until the commit succeeds. * Investigate how other RecoverableWriter implementations handle blob-size limitations. For example, S3 has the same 5TB limit. As we've discussed, there would be ways to write multiple files in such scenarios, but we'd have to give up a bit of atomicity that is assumed in the design. I'm not really sure whether that would be allowed or if the answer is "commit frequently enough to keep the size of final blobs under 5TB." Someone else from the Flink team would have to weigh in on that. * Consider a change to the composing algorithm that I'll describe below, which optimizes for total bytes written/composed vs. number of compose operations. Possible alternate algorithm: Consider the case where there are 125 raw temporary blobs to be aggregated, each of size 1GB. The current method (and your proposed method) goes like this: * Combine the first 32 blobs into 1 blob – 1 compose operation, 32GB written into a new blob * Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 32GB = 63GB written into a new blob * Combine that blob with the next 31 blobs - 1 compose operation, 31GB + 63GB = 94GB written into a new blob * Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 94GB = 125GB written into a new blob So, a total of 4 compose operations and 32GB + 63GB + 94GB + 125GB = 314GB written/composed. Alternately, we could do this: * Combine the first 32 blobs into 1 intermediate blob – 1 compose operation, 32GB written into a new blob * Combine the second 32 blobs into 1 intermediate blob – 1 compose operation 32GB written into a new blob * Combine the third 32 blob into 1 intermediate blob – 1 compose operation, 32GB written into a new blob * Combine the last 29 blobs into 1 intermediate blob – 1 compose operation, 29GB written into a new blob * Combine the 4 intermediate blobs into one final blob – 1 compose operation, 32GB + 32GB + 32GB + 29GB = 125GB written into a new blob This method has 5 compose operations – one more than the existing way – but only writes a total of 32GB + 32GB + 32GB + 29GB + 125GB = 250GB. So, more compose operations but less intermediate data and total data written. EDIT: Assuming I did my math right: If N is the total number of raw temporary blobs to aggregate, and they're all the same size, the total volume of data written in the first algorithm (existing) is approximately proportional to N^2 as compared to N * log N for the second algorithm. My guess is that would have the biggest impact in high-volume scenarios. Overwriting the same staging blob would be moot in this case. We could still delete intermediate blobs along the way, just not the raw temporary blobs, which have to be retained. was (Author: galenwarren): I think your proposed algorithm is just another way of implementing the "delete intermediate blobs that are no longer necessary as soon as possible" idea we've considered. You accomplish it by overwriting the staging blob on each iteration; a similar effect could be achieved by writing to a new intermediate staging blob on each iteration (as is done now) and deleting the old one right away (which is not done now. instead this deletion occurs shortly thereafter, after the commit succeeds). Aside: I'm not sure whether it's possible to overwrite the existing staging blob like you suggest. The [docs |[Objects: compose | Cloud Storage | Google Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for the compose operation say: {quote}Concatenates a list of existing objects into a new object in the same bucket. The existing source objects are unaffected by this operation {quote} In your proposal, the same staging blob is both the target of the compose operation and one of the input blobs to be composed. That _might_ work, but would have to be tested to see if it's allowed. If it isn't, writing to a new blob and deleting the old one would have essentially the same effect. That's almost certainly what happens behind the scenes anyway, since blobs are immutable. Bigger picture, if you're trying to combine millions of immutable blobs together in one step, 32 at a time, I don't see how you avoid having lots of intermediate composed blobs, one way or another, at least temporarily. The main question would be how quickly ones that are no longer needed are discarded. {quote}To streamline this, we could modify {{GSCommitRecoverable}} to update the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already appended to the {{stagingBlob}} {quote} Not quite following you here; the GSCommitRecoverable is provided as an input to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e. uncomposed) temporary blobs that need to be composed and committed. If you removed those raw blobs from that list as they're added to the staging blob, what would that accomplish? The GSCommitRecoverable provided to the GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds or fails, and if the commit were to be retried it would need the complete list anyway. {quote}I notice an other issue with the code - because the storage.compose might throw a StorageException. With the current code this would mean the intermediate composition blobs are not cleaned up. If the code above is implemented, I think there is no longer a need the TTL feature since all necessary blobs should be written to a final blob. Any leftover blobs post-job completion would indicate a failed state. {quote} There is no real way to prevent the orphaning of intermediate blobs in all failure scenarios. Even in your proposed algorithm, the staging blob could be orphaned if the composition process failed partway through. This is what the temp bucket and TTL mechanism are for. It's optional, so you don't have to use it, but yes you would have to keep an eye out for orphaned intermediate blobs via some other mechanism f you choose not to use it. Honestly, I think some of your difficulties are coming from trying to combine so much data together at once vs. doing it along the way, with commits at incremental checkpoints. I was going to suggest you reduce your checkpoint interval, to aggregate more frequently, but obviously that doesn't work if you're not checkpointing at all. I do think the RecoverableWriter mechanism is designed to make writing predictable and repeatable in checkpoint/recovery situations (hence the name); if you're not doing any of that, you may run into some challenges. If you were to use checkpoints, you would aggregate more frequently, meaning fewer intermediate blobs with shorter lifetimes, and you'd be less likely to run up against the 5TB limit, as you would end up with a series of smaller files, written at each checkpoint, vs. one huge file. If we do want to consider changes, here's what I suggest: * Test whether GCP allows composition to overwrite an existing blob that is also an input to the compose operation. If that works, then we could use that technique in the composeBlobs function. As mentioned above, I doubt that this actually reduces the number of blobs that exist temporarily – since blobs are immutable objects – but it would minimize their lifetime, effectively deleting them as soon as possible. If overwriting doesn't work, then we could achieve a similar effect by deleting intermediate blobs as soon as they are not needed anymore, rather than waiting until the commit succeeds. * Investigate how other RecoverableWriter implementations handle blob-size limitations. For example, S3 has the same 5TB limit. As we've discussed, there would be ways to write multiple files in such scenarios, but we'd have to give up a bit of atomicity that is assumed in the design. I'm not really sure whether that would be allowed or if the answer is "commit frequently enough to keep the size of final blobs under 5TB." Someone else from the Flink team would have to weigh in on that. * Consider a change to the composing algorithm that I'll describe below, which optimizes for total bytes written/composed vs. number of compose operations. Possible alternate algorithm: Consider the case where there are 125 raw temporary blobs to be aggregated, each of size 1GB. The current method (and your proposed method) goes like this: * Combine the first 32 blobs into 1 blob – 1 compose operation, 32GB written into a new blob * Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 32GB = 63GB written into a new blob * Combine that blob with the next 31 blobs - 1 compose operation, 31GB + 63GB = 94GB written into a new blob * Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 94GB = 125GB written into a new blob So, a total of 4 compose operations and 32GB + 63GB + 94GB + 125GB = 314GB written/composed. Alternately, we could do this: * Combine the first 32 blobs into 1 intermediate blob – 1 compose operation, 32GB written into a new blob * Combine the second 32 blobs into 1 intermediate blob – 1 compose operation 32GB written into a new blob * Combine the third 32 blob into 1 intermediate blob – 1 compose operation, 32GB written into a new blob * Combine the last 29 blobs into 1 intermediate blob – 1 compose operation, 29GB written into a new blob * Combine the 4 intermediate blobs into one final blob – 1 compose operation, 32GB + 32GB + 32GB + 29GB = 125GB written into a new blob This method has 5 compose operations – one more than the existing way – but only writes a total of 32GB + 32GB + 32GB + 29GB + 125GB = 250GB. So, more compose operations but less intermediate data and total data written. > GSRecoverableWriterCommitter is generating excessive data blobs > --------------------------------------------------------------- > > Key: FLINK-34696 > URL: https://issues.apache.org/jira/browse/FLINK-34696 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem > Reporter: Simon-Shlomo Poil > Priority: Major > > The `composeBlobs` method in > `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to > merge multiple small blobs into a single large blob using Google Cloud > Storage's compose method. This process is iterative, combining the result > from the previous iteration with 31 new blobs until all blobs are merged. > Upon completion of the composition, the method proceeds to remove the > temporary blobs. > *Issue:* > This methodology results in significant, unnecessary data storage consumption > during the blob composition process, incurring considerable costs due to > Google Cloud Storage pricing models. > *Example to Illustrate the Problem:* > - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB). > - After 1st step: 32 blobs are merged into a single blob, increasing total > storage to 96 GB (64 original + 32 GB new). > - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, > raising the total to 159 GB. > - After 3rd step: The final blob is merged, culminating in a total of 223 GB > to combine the original 64 GB of data. This results in an overhead of 159 GB. > *Impact:* > This inefficiency has a profound impact, especially at scale, where terabytes > of data can incur overheads in the petabyte range, leading to unexpectedly > high costs. Additionally, we have observed an increase in storage exceptions > thrown by the Google Storage library, potentially linked to this issue. > *Suggested Solution:* > To mitigate this problem, we propose modifying the `composeBlobs` method to > immediately delete source blobs once they have been successfully combined. > This change could significantly reduce data duplication and associated costs. > However, the implications for data recovery and integrity need careful > consideration to ensure that this optimization does not compromise the > ability to recover data in case of a failure during the composition process. > *Steps to Reproduce:* > 1. Initiate the blob composition process in an environment with a significant > number of blobs (e.g., 64 blobs of 1 GB each). > 2. Observe the temporary increase in data storage as blobs are iteratively > combined. > 3. Note the final amount of data storage used compared to the initial total > size of the blobs. > *Expected Behavior:* > The blob composition process should minimize unnecessary data storage use, > efficiently managing resources to combine blobs without generating excessive > temporary data overhead. > *Actual Behavior:* > The current implementation results in significant temporary increases in data > storage, leading to high costs and potential system instability due to > frequent storage exceptions. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)