[ https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136731#comment-17136731 ]
Oleksandr Nitavskyi commented on FLINK-17505: --------------------------------------------- MergeOperartor (ConsolidationOperator) will not be able to replace files atomically(at least on HDFS), so some isolation can be violated. For this the possible solution would be to produce data to some temp directory (aka [temp bucket |https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]), then the Consolidation operator will merge small files depending on the format and move files to the final destination in the background. Unfortunately for S3 FS, it would require the copying of the data. Also even in case if the merge will not change any files and simply moves files to the final destination we can a useful feature to the FileStreeamingSink outputs. Since currently, any consumers of the files produced by Flink should filter files without suffixes(which neither .in-progress no .pending). Probably we want to move this logic on the Flink side. Problems: 1. When to start file consolidation? Ideally, we want to perform merge iteration once the files were renamed from pending. Which is performed once the checkpoint is done or upon the recovery. But it is not obvious how reliably react to such events in another operator. So we probably want to merge files periodically on some timer with some configurable period (probably similar to the checkpoint interval). 2. When files should actually be merged? There are at least two cases when files should be merged together and moved to the final directory: 1. The desirable size of the input files is achieved. 2. The bucket is closed. E.g. in case of time series export we probably should be able to compare time associated with a bucket and current watermark (if any). So it should be decided by bucketAssigner and bucketContext. 3. When files should be moved? * Once they achieve desired file size (or when they were actually merged by achieving the desirable input files size) * When the bucket is actually closed. E.g. it is a time series bucket and BucketAssigner with a bucket context can suppose that the bucket is closed. More detailed thoughts about Meta-Info has been precise in this technical doc https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3 * When the bucket is not yet closed (or never will be), but certain inactivity time has passed. 4. How to handle the failures? MergeOperator should perform merging and then move merged files to the final directories. Since this operation cannot be made atomically and mutates the state on FS we should ensure idempotence of the merge/move/source removal operation. For this, we can store some state describing the mutation plan of the input files. We can use Flink State for this or persist the transaction plan on output FS. 5. How to share files from different slots for merging? We probably want to keep the same parallelism as FileStreamingSink. And MergeOperators should consider only the files produced by the Sink from the same slot. In this case on bucket closing if we want to keep the optimal output size we should make another consolidation strategy. So in order to keep efficiency, we want to perform merge operations in parallel. 6. How to discover files which should be merged? Such files are known by Bucket class. A possible solution is to forward all newly created filenames to the MergeOperator. Another solution is simply to list open buckets periodically. In case we have high parallelism we risk creating unnecessary load on the underlying file system. So for this operation, we would prefer to have a parallelism = 1. 7. Should we split files if they are too big? Probably the problem of the big files should be addressed by the proper Checkpoint Policy. > Merge small files produced by StreamingFileSink > ----------------------------------------------- > > Key: FLINK-17505 > URL: https://issues.apache.org/jira/browse/FLINK-17505 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem > Affects Versions: 1.10.0 > Reporter: Piotr Nowojski > Priority: Major > > This an alternative approach to FLINK-11499, to solve a problem of creating > many small files with bulk formats in StreamingFileSink (which have to be > rolled on checkpoint). > Merge based approach would require converting {{StreamingFileSink}} from a > sink, to an operator, that would be working exactly as it’s working right > now, with the same limitations (no support for arbitrary rolling policies for > bulk formats), followed by another operator that would be tasked with merging > small files in the background. > In the long term we probably would like to have both merge operator and write > ahead log solution (WAL described in FLINK-11499) as alternatives, as WAL > would behave better if small files are more common, and merge operator could > behave better if small files are rare (because of data skew for example). -- This message was sent by Atlassian Jira (v8.3.4#803005)