cashmand commented on code in PR #46831: URL: https://github.com/apache/spark/pull/46831#discussion_r1684832353
########## common/variant/shredding.md: ########## @@ -0,0 +1,244 @@ +# Shredding Overview + +The Spark Variant type is designed to store and process semi-structured data efficiently, even with heterogeneous values. Query engines encode each variant value in a self-describing format, and store it as a group containing **value** and **metadata** binary fields in Parquet. Since data is often partially homogenous, it can be beneficial to extract certain fields into separate Parquet columns to further improve performance. We refer to this process as "shredding". Each Parquet file remains fully self-describing, with no additional metadata required to read or fully reconstruct the Variant data from the file. Combining shredding with a binary residual provides the flexibility to represent complex, evolving data with an unbounded number of unique fields while limiting the size of file schemas, and retaining the performance benefits of a columnar format. + +This document focuses on the shredding semantics, Parquet representation, implications for readers and writers, as well as the Variant reconstruction. For now, it does not discuss which fields to shred, user-facing API changes, or any engine-specific considerations like how to use shredded columns. The approach builds on top of the generic Spark Variant representation, and leverages the existing Parquet specification for maximum compatibility with the open-source ecosystem. + +At a high level, we replace the **value** and **metadata** of the Variant Parquet group with one or more fields called **object**, **array**, **typed_value** and **untyped_value**. These represent a fixed schema suitable for constructing the full Variant value for each row. + +Shredding lets Spark (or any other query engine) reap the full benefits of Parquet's columnar representation, such as more compact data encoding, min/max statistics for data skipping, and I/O and CPU savings from pruning unnecessary fields not accessed by a query (including the non-shredded Variant binary data). +Without shredding, any query that accesses a Variant column must fetch all bytes of the full binary buffer. With shredding, we can get nearly equivalent performance as in a relational (scalar) data model. + +For example, `select variant_get(variant_col, ‘$.field1.inner_field2’, ‘string’) from tbl` only needs to access `inner_field2`, and the file scan could avoid fetching the rest of the Variant value if this field was shredded into a separate column in the Parquet schema. Similarly, for the query `select * from tbl where variant_get(variant_col, ‘$.id’, ‘integer’) = 123`, the scan could first decode the shredded `id` column, and only fetch/decode the full Variant value for rows that pass the filter. + +# Parquet Example + +Consider the following Parquet schema together with how Variant values might be mapped to it. Notice that we represent each shredded field in **object** as a group of two fields, **typed_value** and **untyped_value**. We extract all homogenous data items of a certain path into **typed_value**, and set aside incompatible data items in **untyped_value**. Intuitively, incompatibilities within the same path may occur because we store the shredding schema per Parquet file, and each file can contain several row groups. Selecting a type for each field that is acceptable for all rows would be impractical because it would require buffering the contents of an entire file before writing. + +Typically, the expectation is that **untyped_value** exists at every level as an option, along with one of **object**, **array** or **typed_value**. If the actual Variant value contains a type that does not match the provided schema, it is stored in **untyped_value**. An **untyped_value** may also be populated if an object can be partially represented: any fields that are present in the schema must be written to those fields, and any missing fields are written to **untyped_valud**. + +``` +optional group variant_col { + optional binary untyped_value; + optional group object { + optional group a { + optional binary untyped_value; + optional int64 typed_value; + } + optional group b { + optional binary untyped_value; + optional group object { + optional group c { + optional binary untyped_value; + optional binary typed_value (STRING); + } + } + } + } +} +``` + +| Variant Value | Top-level untyped_value | b.untyped_value | Non-null in a | Non-null in b.c | +|---------------|--------------------------|---------------|---------------| +| {a: 123, b: {c: “hello”}} | null | null | typed_value | typed_value | +| {a: 1.23, b: {c: “123”}} | null | null | untyped_value | typed_value | +| {a: [1,2,3], b: {c: null}} | null | null | untyped_value | untyped_value | +| {a: 123, c: 456} | {c: 456} | null | typed_value | null | +| {a: 123, b: {c: "hello", d: 456}} | null | {d: 456} | typed_value | typed_value | +| [{a: 1, b: {c: 2}}, {a: 3, b: {c: 4}}] | [{a: 1, b: {c: 2}}, {a: 3, b: {c: 4}}] | null | null | null | + +# Parquet Layout + +The **array** and **object** fields represent Variant array and object types, respectively. Arrays must use the three-level list structure described in https://github.com/apache/parquet-format/blob/master/LogicalTypes.md. + +An **object** field must be a group. Each field name of this inner group corresponds to the Variant value's object field name. Each inner field's type is a recursively shredded variant value: that is, the fields of each object field must be one or more of **object**, **array**, **typed_value** or **untyped_value**. + +Similarly the elements of an **array** must be a group containing one or more of **object**, **array**, **typed_value** or **untyped_value**. + +Each leaf in the schema can store an arbitrary Variant value. It contains an **untyped_value** binary field and a **typed_value** field. If non-null, **untyped_value** represents the value stored as a Variant binary; the metadata and value of a normal Variant are concatenated. The **typed_value** field may be any type that has a corresponding Variant type. For each value in the data, at most one of the **typed_value** and **untyped_value** may be non-null. A writer may omit either field, which is equivalent to all rows being null. + +| typed_value | untyped_value | Meaning | +|-------------|----------------|---------| +| null | null | Field is missing in the reconstructed Variant. | +| null | non-null | Field may be any type in the reconstructed Variant. | +| non-null | null | Field has this column’s type in the reconstructed Variant. | +| non-null | non-null | Invalid | + +The **typed_value** may be absent from the Parquet schema for any field, which is equivalent to its value being always null (in which case the shredded field is always stored as a Variant binary). By the same token, **untyped_value** may be absent, which is equivalent to their value being always null (in which case the field will always be missing or have the type of the **typed_value** column). + +The full metadata and value can be reconstructed from **untyped_value** by treating the leading bytes as metadata, and using the header, dictionary size and final dictionary offset to determine the start of the Variant value section. (See the metadata description in the common/variant/README.md for more detail on how to interpret it.) For example, in the binary below, there is a one-element dictionary, and the final offset (`offset[1]`) indicates that the last dictionary entry ends at the second byte. Therefore the full metadata size is six bytes, and the rest is the value section of the Variant. Review Comment: The metadata is mainly the dictionary of all object keys. Once we shred, the keys that were associated with shredded fields are no longer needed (since they're stored as field names in the Parquet schema). The keys that are still needed (e.g. for an object that wasn't fully shredded) are stored as needed in the metadata of objects that use those keys. When reconstructing a full Variant value, the metadata will need to be rebuilt as the full value is built up from the shredded components. I'm open to other ideas, but the main motivation for not leaving a single metadata at the top level is that it would need to be fetched any time we fetch an `untyped_value` column. In a situation where some portion of the original value has very irregular keys from row to row, this could be a large penalty. The downsides I can see are: 1) Needing to inspect and possibly rebuild each `untyped_value` when reconstructing the full Variant value. I think this is the biggest problem with this approach. 2) Storing a small metadata next to every `untyped_value` adds overhead. Experimentally, this seems to be pretty small (~1%) after compression (I tested with snappy and zstd) if the metadata is regular. 3) If the same key is used in different parts of the schema, they'll be duplicated. This doesn't seem likely to be a major issue, since the point of the metadata is to capture duplication from row to row, or in multiple array elements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org