GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/14690
[SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query (This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.) (N.B. I'm submitting this PR as an enhanced version of an internal POC I wrote. I'm looking for preliminary feedback on what I have so far and to discuss some design and implementation issues. This PR is not currently a candidate for merging into master.) (N.B. This PR is known to fail several unit tests related to Hive/Parquet conversion. Obviously, these failures will be addressed before this PR is submitted for merging into master.) ## What changes were proposed in this pull request? In a new Spark application, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference. If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild. In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space. This PR proposes an alternative approach. Basically, it makes three changes: 1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates. 1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates. 1. It removes partition loading and caching from `HiveMetastoreCatalog`. The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The physical planner identifies the data files the query needs by asking the relation's `TableFileCatalog` for the files matching any predicate pruning predicates. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters. ## Open Issues 1. This PR omits partition metadata caching. I'm not sure if this is even needed if we're only loading partition metadata for a given query. However, it may not be that tricky to implement this effectively. 1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. I think this is the most significant hurdle for this PR. It just occurred to me that we might be able to do just-in-time schema reconciliation using the partitions that are used in a query. I haven't tried this, but I would attempt this by adding a method to `HadoopFsRelation` or `BasicFileCatalog` which returns a SQL schema for a given sequence of partition pruning predicates (or partitions). I'll give this a try and report back. Another idea would be to use the current strategy of merging schema from all table files unless the user sets a boolean SQL configuration parameter like `spark.sql.assumeLowerCaseColumnNames`. If the user's tables have only lower-case column names, then it's safe to use this PR's optimizations. I don't think this is an entirely unrealistic scenario as w e have enforced all lower-case column names from the beginning because of case-sensitivity issues. Maybe we're not the only ones? 1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`. 1. Should we use `TableFileCatalog` for non-partitioned tables, too (to unify the partitioned and non-partitioned table access code paths)? This is probably something to explore in a future PR. 1. This PR breaks parquet log output redirection. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, however we need to figure out how to fix this properly. 1. This PR includes no new unit tests. We should probably have some. :) ## How was this patch tested? The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded. N.B. Several tests in the hive subproject's `ParquetMetastoreSuite` fail. All of these tests fail because of the use of upper case characters in the test parquet table column names. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VideoAmp/spark-public spark-16980-lazy_partition_fetching Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14690.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14690 ---- ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org