GitHub user mallman opened a pull request:

    https://github.com/apache/spark/pull/14690

    [SPARK-16980][SQL] Load only catalog table partition metadata required to 
answer a query

    (This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)
    
    (N.B. I'm submitting this PR as an enhanced version of an internal POC I 
wrote. I'm looking for preliminary feedback on what I have so far and to 
discuss some design and implementation issues. This PR is not currently a 
candidate for merging into master.)
    
    (N.B. This PR is known to fail several unit tests related to Hive/Parquet 
conversion. Obviously, these failures will be addressed before this PR is 
submitted for merging into master.)
    
    ## What changes were proposed in this pull request?
    
    In a new Spark application, when a partitioned Hive table is converted to 
use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every 
partition of that table are retrieved from the metastore and loaded into driver 
memory. In addition, every partition's metadata files are read from the 
filesystem to perform schema inference.
    
    If a user queries such a table with predicates which prune that table's 
partitions, we would like to be able to answer that query without consulting 
partition metadata which are not involved in the query. When querying a table 
with a large number of partitions for some data from a small number of 
partitions (maybe even a single partition), the current conversion strategy is 
highly inefficient. I suspect this scenario is not uncommon in the wild.
    
    In addition to being inefficient in running time, the current strategy is 
inefficient in its use of driver memory. When the sum of the number of 
partitions of all tables loaded in a driver reaches a certain level (somewhere 
in the tens of thousands), their cached data exhaust all driver heap memory in 
the default configuration. I suspect this scenario is less common (in that not 
too many deployments work with tables with tens of thousands of partitions), 
however this does illustrate how large the memory footprint of this metadata 
can be. With tables with hundreds or thousands of partitions, I would expect 
the `HiveMetastoreCatalog` table cache to represent a significant portion of 
the driver's heap space.
    
    This PR proposes an alternative approach. Basically, it makes three changes:
    
    1. It adds a new method, `listPartitionsByFilter` to the Catalyst 
`ExternalCatalog` trait which returns the partition metadata for a given 
sequence of partition pruning predicates.
    1. It refactors the `FileCatalog` type hierarchy to include a new 
`TableFileCatalog` to efficiently return files only for partitions matching a 
sequence of partition pruning predicates.
    1. It removes partition loading and caching from `HiveMetastoreCatalog`.
    
    The net effect is that when a query over a partitioned Hive table is 
planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. 
As part of this operation, the `HiveMetastoreCatalog` builds a 
`HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition 
metadata or scan any files. The physical planner identifies the data files the 
query needs by asking the relation's `TableFileCatalog` for the files matching 
any predicate pruning predicates. The `TableFileCatalog` in turn calls the 
`listPartitionsByFilter` method on its external catalog. This queries the Hive 
metastore, passing along those filters.
    
    ## Open Issues
    
    1. This PR omits partition metadata caching. I'm not sure if this is even 
needed if we're only loading partition metadata for a given query. However, it 
may not be that tricky to implement this effectively.
    1. This PR removes and omits partitioned Hive table schema reconciliation. 
As a result, it fails to find Parquet schema columns with upper case letters 
because of the Hive metastore's case-insensitivity. I think this is the most 
significant hurdle for this PR. It just occurred to me that we might be able to 
do just-in-time schema reconciliation using the partitions that are used in a 
query. I haven't tried this, but I would attempt this by adding a method to 
`HadoopFsRelation` or `BasicFileCatalog` which returns a SQL schema for a given 
sequence of partition pruning predicates (or partitions). I'll give this a try 
and report back. Another idea would be to use the current strategy of merging 
schema from all table files unless the user sets a boolean SQL configuration 
parameter like `spark.sql.assumeLowerCaseColumnNames`. If the user's tables 
have only lower-case column names, then it's safe to use this PR's 
optimizations. I don't think this is an entirely unrealistic scenario as w
 e have enforced all lower-case column names from the beginning because of 
case-sensitivity issues. Maybe we're not the only ones?
    1. This PR omits an implementation of `listPartitionsByFilter` for the 
`InMemoryCatalog`.
    1. Should we use `TableFileCatalog` for non-partitioned tables, too (to 
unify the partitioned and non-partitioned table access code paths)? This is 
probably something to explore in a future PR.
    1. This PR breaks parquet log output redirection. I can work around this by 
running 
`Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")`
 first thing in a Spark shell session, however we need to figure out how to fix 
this properly.
    1. This PR includes no new unit tests. We should probably have some. :)
    
    ## How was this patch tested?
    
    The current Spark unit tests were run, and some ad-hoc tests were performed 
to validate that only the necessary partition metadata is loaded.
    
    N.B. Several tests in the hive subproject's `ParquetMetastoreSuite` fail. 
All of these tests fail because of the use of upper case characters in the test 
parquet table column names.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/VideoAmp/spark-public 
spark-16980-lazy_partition_fetching

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14690.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14690
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to