aokolnychyi opened a new pull request #875: [WIP] Spark: Implement an action to rewrite manifests URL: https://github.com/apache/incubator-iceberg/pull/875 This WIP PR adds a Spark action that rewrites manifests and optimizes the layout of metadata for faster job planning. ## Background Iceberg users have two ways to optimize metadata: rely on the automatic merging of manifests (e.g. `MergeAppend`) or rewrite manifests on demand using `RewriteManifests`. The automatic merge of manifests is a great feature and works really well when writes to a table are aligned with partitions. If each incoming batch writes to many partitions, `RewriteManifests` can be used to rewrite certain manifests on demand. We have tables with tens of PBs of data where the job planning and execution combined has to be under 10s for certain queries even on small clusters. That's why every second spent on job planning matters. Each incoming batch in those tables can write to almost all partitions which forces us to frequently rewrite all metadata. `RewriteManifests` is handy for rewriting a reasonable number of manifests but it is slow for rewriting all metadata in huge tables. In addition, `RewriteManifests` requires us to define clustering values manually and there is no automatic bin-packing of small clusters. With snapshot id inheritance, we have a new way to approach this problem. Specifically, we can prepare manifests in a distributed manner on executors and cheaply commit them on the driver. This serves as a great basis for the Spark action that rewrites manifests. ## Algorithm The algorithm is as follows: - Determine which manifests should be rewritten according to a user-defined predicate. - Create a `Dataset` of manifest entries. - Compute the average size for one manifest entry given the available stats. - Estimate the size of metadata per partition spec and partition. - Sort metadata entries according to partition values. - Bin-pack smaller partitions to achieve bins of the target size while preserving the ordering. - Produce new manifests with the correct target size and proper clustering. - Replace old manifests with new manifests. ## Open Items One open item is how to read manifest entries. Right now, I am constructing a `Dataset` of entries directly. It would be great to leverage metadata tables for this. However, there are a couple of things that prohibit it right now. - No spec id in the entries metadata table. Having a spec id is essential for this action. This point can be easily fixed. - The entries metadata table returns us a `DataFrame`. We need a typed `Dataset` and actual `DataFile` instances to create new manifests. There are a few options to make this work: - We can have a custom encoder. Unfortunately, Spark has very limited functionality for that. One potential approach would be to use `UserDefinedType` but it is a private API. There were multiple attempts to make that public but all were ignored by the community. - We can have a bean to represent `DataFile`. The potential problem is how to express partition values. Right now, the partition field in the metadata table is a struct whose type varies from table to table and from partition spec to partition spec. Previously, we had a trick to represent partition values as a string path while migrating existing tables. I do not think it is a good idea here. In our original implementation, we were using Scala and tuple encoders in which only `DataFile` was stored as bytes. In this PR, we migrated to Java not to worry about Scala versions and not to introduce a dependency on a new language in Iceberg. I am using Java serialization for now on complete beans as bean encoders won't work with `DataFile`. This seems to increase the size of the dataset only a bit. Switching to Kryo might not be safe as many classes in Iceberg use the Serialization proxy pattern but Kryo does not interpret that correctly. One example is `Type`. Having a custom encoder would help here.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
