aokolnychyi opened a new pull request #875: [WIP] Spark: Implement an action to 
rewrite manifests
URL: https://github.com/apache/incubator-iceberg/pull/875
 
 
   This WIP PR adds a Spark action that rewrites manifests and optimizes the 
layout of metadata for faster job planning.
   
   ## Background
   
   Iceberg users have two ways to optimize metadata: rely on the automatic 
merging of manifests (e.g. `MergeAppend`) or rewrite manifests on demand using 
`RewriteManifests`. The automatic merge of manifests is a great feature and 
works really well when writes to a table are aligned with partitions. If each 
incoming batch writes to many partitions, `RewriteManifests` can be used to 
rewrite certain manifests on demand.
   
   We have tables with tens of PBs of data where the job planning and execution 
combined has to be under 10s for certain queries even on small clusters. That's 
why every second spent on job planning matters. Each incoming batch in those 
tables can write to almost all partitions which forces us to frequently rewrite 
all metadata. `RewriteManifests` is handy for rewriting a reasonable number of 
manifests but it is slow for rewriting all metadata in huge tables. In 
addition, `RewriteManifests` requires us to define clustering values manually 
and there is no automatic bin-packing of small clusters.
   
   With snapshot id inheritance, we have a new way to approach this problem. 
Specifically, we can prepare manifests in a distributed manner on executors and 
cheaply commit them on the driver. This serves as a great basis for the Spark 
action that rewrites manifests.
   
   ## Algorithm
   
   The algorithm is as follows:
   
   - Determine which manifests should be rewritten according to a user-defined 
predicate.
   - Create a `Dataset` of manifest entries.
   - Compute the average size for one manifest entry given the available stats.
   - Estimate the size of metadata per partition spec and partition.
   - Sort metadata entries according to partition values.
   - Bin-pack smaller partitions to achieve bins of the target size while 
preserving the ordering.
   - Produce new manifests with the correct target size and proper clustering.
   - Replace old manifests with new manifests.
   ## Open Items
   
   One open item is how to read manifest entries. Right now, I am constructing 
a `Dataset` of entries directly. It would be great to leverage metadata tables 
for this. However, there are a couple of things that prohibit it right now.
   - No spec id in the entries metadata table. Having a spec id is essential 
for this action. This point can be easily fixed.
   - The entries metadata table returns us a `DataFrame`. We need a typed 
`Dataset` and actual `DataFile` instances to create new manifests. There are a 
few options to make this work:
     - We can have a custom encoder. Unfortunately, Spark has very limited 
functionality for that. One potential approach would be to use 
`UserDefinedType` but it is a private API. There were multiple attempts to make 
that public but all were ignored by the community.
     - We can have a bean to represent `DataFile`. The potential problem is how 
to express partition values. Right now, the partition field in the metadata 
table is a struct whose type varies from table to table and from partition spec 
to partition spec. Previously, we had a trick to represent partition values as 
a string path while migrating existing tables. I do not think it is a good idea 
here.
   
   In our original implementation, we were using Scala and tuple encoders in 
which only `DataFile` was stored as bytes. In this PR, we migrated to Java not 
to worry about Scala versions and not to introduce a dependency on a new 
language in Iceberg. I am using Java serialization for now on complete beans as 
bean encoders won't work with `DataFile`. This seems to increase the size of 
the dataset only a bit. Switching to Kryo might not be safe as many classes in 
Iceberg use the Serialization proxy pattern but Kryo does not interpret that 
correctly. One example is `Type`. Having a custom encoder would help here.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to