GitHub user liancheng opened a pull request: https://github.com/apache/spark/pull/8509
[SPARK-10301] [SQL] Fixes schema merging for nested structs This PR can be quite challenging to review. I'm trying to give a detailed description of the problem as well as its solution here. When reading Parquet files, we need to specify a potentially nested Parquet schema (of type `MessageType`) as requested schema for column pruning. This Parquet schema is translated from a Catalyst schema (of type `StructType`), which is generated by the query planner and represents all requested columns. However, this translation can be fairly complicated because of several reasons: 1. Requested schema must conform to the real schema of the physical file to be read. This means we have to tailor the actual file schema of every individual physical Parquet file to be read according to the given Catalyst schema. Fortunately we are already doing this in Spark 1.5 by pushing request schema conversion to executor side in PR #7231. 1. Support for schema merging. A single Parquet dataset may consist of multiple physical Parquet files come with different but compatible schemas. This means we may request for a column path that doesn't exist in a physical Parquet file. All requested column paths can be nested. For example, for a Parquet file schema ``` message root { required group f0 { required group f00 { required int32 f000; required binary f001 (UTF8); } } } ``` we may request for column paths defined in the following schema: ``` message root { required group f0 { required group f00 { required binary f001 (UTF8); required float f002; } } optional double f1; } ``` Notice that we pruned column path `f0.f00.f000`, but added `f0.f00.f002` and `f1`. The good news is that Parquet handles non-existing column paths properly and always returns null for them. 1. The map from `StructType` to `MessageType` is a one-to-many map. This is the most unfortunate part. Due to historical reasons (dark histories!), schemas of Parquet files generated by different libraries have different "flavors". For example, to handle a schema with a single non-nullable column, whose type is an array of non-nullable integers, parquet-protobuf generates the following Parquet schema: ``` message m0 { repeated int32 f; } ``` while parquet-avro generates another version: ``` message m1 { required group f (LIST) { repeated int32 array; } } ``` and parquet-thrift spills this: ``` message m1 { required group f (LIST) { repeated int32 f_tuple; } } ``` All of them can be mapped to the following _unique_ Catalyst schema: ``` StructType( StructField( "f", ArrayType(IntegerType, containsNull = false), nullable = false)) ``` This greatly complicates Parquet requested schema construction, since the path of a given column varies in different cases. To read the array elements from files with the above schemas, we must use `f` for `m0`, `f.array` for `m1`, and `f.f_tuple` for `m2`. In earlier Spark versions, we didn't try to fix this issue properly. Spark 1.4 and prior versions simply translate the Catalyst schema in a way more or less compatible with parquet-hive and parquet-avro, but is broken in many other cases. Earlier revisions of Spark 1.5 only try to tailor the Parquet file schema at the first level, and ignore nested one. This caused [SPARK-10301] [spark-10301] as well as [SPARK-10005] [spark-10005]. In PR #8228, I tried to avoid the hard part of the problem and made a minimum change in `CatalystRowConverter` to fix SPARK-10005. However, when taking SPARK-10301 into consideration, keeping hacking `CatalystRowConverter` doesn't seem to be a good idea. SO this PR is an attempt to fix the problem in a proper way. For a given physical Parquet file with schema `ps` and a compatible Catalyst requested schema `cs`, we use the following algorithm to tailor `ps` to get the result Parquet requested schema `ps'`: For a leaf column path `c` in `cs`: - if `c` exists in `cs` and a corresponding Parquet column path `c'` can be found in `ps`, `c'` should be included in `ps'`; - otherwise, we convert `c` to a Parquet column path `c"` using `CatalystSchemaConverter`, and include `c"` in `ps'`; - no other column path should exist in `ps'`. Then comes the most tedious part: > Given `cs`, `ps`, and `c`, how to locate `c'` in `ps`? Unfortunately, there's no quick answer, and we have to enumerate all possible structures defined in parquet-format spec. They are: 1. the standard structure of nested types, and 1. cases defined in all backwards-compatibility rules for `LIST` and `MAP`. The core part of this PR is `CatalystReadSupport.clipParquetType()`, which tailors a given Parquet file schema according to a requested schema in its Catalyst form. Backwards-compatibility rules of `LIST` and `MAP` are covered in `clipParquetListType()` and `clipParquetMapType()` respectively. The column path selection algorithm is implemented in `clipParquetRecord()`. [spark-10301]: https://issues.apache.org/jira/browse/SPARK-10301 [spark-10005]: https://issues.apache.org/jira/browse/SPARK-10005 You can merge this pull request into a Git repository by running: $ git pull https://github.com/liancheng/spark spark-10301/fix-parquet-requested-schema Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8509.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8509 ---- commit 8e608995a35be616bd6956aef1e8528bc8017e32 Author: Cheng Lian <l...@databricks.com> Date: 2015-08-27T18:21:20Z Clips Parquet requested schema for better compatibility commit a53cf346e694fbf5bb9efba7acfa08ae8f0c91ee Author: Cheng Lian <l...@databricks.com> Date: 2015-08-28T13:53:20Z Fixes test failures ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org