GitHub user liancheng opened a pull request:

    https://github.com/apache/spark/pull/8509

    [SPARK-10301] [SQL] Fixes schema merging for nested structs

    This PR can be quite challenging to review.  I'm trying to give a detailed 
description of the problem as well as its solution here.
    
    When reading Parquet files, we need to specify a potentially nested Parquet 
schema (of type `MessageType`) as requested schema for column pruning.  This 
Parquet schema is translated from a Catalyst schema (of type `StructType`), 
which is generated by the query planner and represents all requested columns.  
However, this translation can be fairly complicated because of several reasons:
    
    1.  Requested schema must conform to the real schema of the physical file 
to be read.
    
        This means we have to tailor the actual file schema of every individual 
physical Parquet file to be read according to the given Catalyst schema.  
Fortunately we are already doing this in Spark 1.5 by pushing request schema 
conversion to executor side in PR #7231.
    
    1.  Support for schema merging.
    
        A single Parquet dataset may consist of multiple physical Parquet files 
come with different but compatible schemas.  This means we may request for a 
column path that doesn't exist in a physical Parquet file.  All requested 
column paths can be nested.  For example, for a Parquet file schema
    
        ```
        message root {
          required group f0 {
            required group f00 {
              required int32 f000;
              required binary f001 (UTF8);
            }
          }
        }
        ```
    
        we may request for column paths defined in the following schema:
    
        ```
        message root {
          required group f0 {
            required group f00 {
              required binary f001 (UTF8);
              required float f002;
            }
          }
    
          optional double f1;
        }
        ```
    
        Notice that we pruned column path `f0.f00.f000`, but added 
`f0.f00.f002` and `f1`.
    
        The good news is that Parquet handles non-existing column paths 
properly and always returns null for them.
    
    1.  The map from `StructType` to `MessageType` is a one-to-many map.
    
        This is the most unfortunate part.
    
        Due to historical reasons (dark histories!), schemas of Parquet files 
generated by different libraries have different "flavors".  For example, to 
handle a schema with a single non-nullable column, whose type is an array of 
non-nullable integers, parquet-protobuf generates the following Parquet schema:
    
        ```
        message m0 {
          repeated int32 f;
        }
        ```
    
        while parquet-avro generates another version:
    
        ```
        message m1 {
          required group f (LIST) {
            repeated int32 array;
          }
        }
        ```
    
        and parquet-thrift spills this:
    
        ```
        message m1 {
          required group f (LIST) {
            repeated int32 f_tuple;
          }
        }
        ```
    
        All of them can be mapped to the following _unique_ Catalyst schema:
    
        ```
        StructType(
          StructField(
            "f",
            ArrayType(IntegerType, containsNull = false),
            nullable = false))
        ```
    
        This greatly complicates Parquet requested schema construction, since 
the path of a given column varies in different cases.  To read the array 
elements from files with the above schemas, we must use `f` for `m0`, `f.array` 
for `m1`, and `f.f_tuple` for `m2`.
    
    In earlier Spark versions, we didn't try to fix this issue properly.  Spark 
1.4 and prior versions simply translate the Catalyst schema in a way more or 
less compatible with parquet-hive and parquet-avro, but is broken in many other 
cases.  Earlier revisions of Spark 1.5 only try to tailor the Parquet file 
schema at the first level, and ignore nested one.  This caused [SPARK-10301] 
[spark-10301] as well as [SPARK-10005] [spark-10005].  In PR #8228, I tried to 
avoid the hard part of the problem and made a minimum change in 
`CatalystRowConverter` to fix SPARK-10005.  However, when taking SPARK-10301 
into consideration, keeping hacking `CatalystRowConverter` doesn't seem to be a 
good idea.  SO this PR is an attempt to fix the problem in a proper way.
    
    For a given physical Parquet file with schema `ps` and a compatible 
Catalyst requested schema `cs`, we use the following algorithm to tailor `ps` 
to get the result Parquet requested schema `ps'`:
    
    For a leaf column path `c` in `cs`:
    
    - if `c` exists in `cs` and a corresponding Parquet column path `c'` can be 
found in `ps`, `c'` should be included in `ps'`;
    - otherwise, we convert `c` to a Parquet column path `c"` using 
`CatalystSchemaConverter`, and include `c"` in `ps'`;
    - no other column path should exist in `ps'`.
    
    Then comes the most tedious part:
    
    > Given `cs`, `ps`, and `c`, how to locate `c'` in `ps`?
    
    Unfortunately, there's no quick answer, and we have to enumerate all 
possible structures defined in parquet-format spec.  They are:
    
    1.  the standard structure of nested types, and
    1.  cases defined in all backwards-compatibility rules for `LIST` and `MAP`.
    
    The core part of this PR is `CatalystReadSupport.clipParquetType()`, which 
tailors a given Parquet file schema according to a requested schema in its 
Catalyst form.  Backwards-compatibility rules of `LIST` and `MAP` are covered 
in `clipParquetListType()` and `clipParquetMapType()` respectively.  The column 
path selection algorithm is implemented in `clipParquetRecord()`.
    
    [spark-10301]: https://issues.apache.org/jira/browse/SPARK-10301
    [spark-10005]: https://issues.apache.org/jira/browse/SPARK-10005


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liancheng/spark 
spark-10301/fix-parquet-requested-schema

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/8509.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8509
    
----
commit 8e608995a35be616bd6956aef1e8528bc8017e32
Author: Cheng Lian <l...@databricks.com>
Date:   2015-08-27T18:21:20Z

    Clips Parquet requested schema for better compatibility

commit a53cf346e694fbf5bb9efba7acfa08ae8f0c91ee
Author: Cheng Lian <l...@databricks.com>
Date:   2015-08-28T13:53:20Z

    Fixes test failures

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to