[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...
Github user andreweduffy closed the pull request at: https://github.com/apache/spark/pull/14649 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/14649#discussion_r76716506 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -58,6 +60,9 @@ class ParquetFileFormat with Logging with Serializable { + // Attempt to cache parquet metadata for this instance of --- End diff -- this instance of ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/14649#discussion_r76716449 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala --- @@ -220,6 +220,21 @@ trait FileFormat { } /** + * Allow FileFormat's to have a pluggable way to utilize pushed filters to eliminate partitions --- End diff -- typo: `FileFormats` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/14649#discussion_r74872775 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -423,6 +425,54 @@ class ParquetFileFormat sqlContext.sessionState.newHadoopConf(), options) } + + override def filterPartitions( + filters: Seq[Filter], + schema: StructType, + conf: Configuration, + allFiles: Seq[FileStatus], + root: Path, + partitions: Seq[Partition]): Seq[Partition] = { +// Read the "_metadata" file if available, contains all block headers. On S3 better to grab +// all of the footers in a batch rather than having to read every single file just to get its +// footer. +allFiles.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE).map { stat => + val metadata = ParquetFileReader.readFooter(conf, stat, ParquetMetadataConverter.NO_FILTER) + partitions.map { part => +filterByMetadata( + filters, + schema, + conf, + root, + metadata, + part) + }.filterNot(_.files.isEmpty) +}.getOrElse(partitions) + } + + private def filterByMetadata( + filters: Seq[Filter], + schema: StructType, + conf: Configuration, + root: Path, + metadata: ParquetMetadata, + partition: Partition): Partition = { +val blockMetadatas = metadata.getBlocks.asScala +val parquetSchema = metadata.getFileMetaData.getSchema +val conjunctiveFilter = filters + .flatMap(ParquetFilters.createFilter(schema, _)) + .reduceOption(FilterApi.and) +conjunctiveFilter.map { conjunction => + val filteredBlocks = RowGroupFilter.filterRowGroups( --- End diff -- Do you mind if I ask a question please? So, if my understanding is correct, Parquet filters rowgroups in both normal reader and vectorized reader already (https://github.com/apache/spark/pull/13701). Is this doing the same thing in Spark-side? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/14649#discussion_r74872795 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -423,6 +425,54 @@ class ParquetFileFormat sqlContext.sessionState.newHadoopConf(), options) } + + override def filterPartitions( + filters: Seq[Filter], + schema: StructType, + conf: Configuration, + allFiles: Seq[FileStatus], + root: Path, + partitions: Seq[Partition]): Seq[Partition] = { +// Read the "_metadata" file if available, contains all block headers. On S3 better to grab +// all of the footers in a batch rather than having to read every single file just to get its +// footer. +allFiles.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE).map { stat => + val metadata = ParquetFileReader.readFooter(conf, stat, ParquetMetadataConverter.NO_FILTER) + partitions.map { part => +filterByMetadata( + filters, + schema, + conf, + root, + metadata, + part) + }.filterNot(_.files.isEmpty) +}.getOrElse(partitions) + } + + private def filterByMetadata( + filters: Seq[Filter], + schema: StructType, + conf: Configuration, + root: Path, + metadata: ParquetMetadata, + partition: Partition): Partition = { +val blockMetadatas = metadata.getBlocks.asScala +val parquetSchema = metadata.getFileMetaData.getSchema +val conjunctiveFilter = filters + .flatMap(ParquetFilters.createFilter(schema, _)) + .reduceOption(FilterApi.and) +conjunctiveFilter.map { conjunction => + val filteredBlocks = RowGroupFilter.filterRowGroups( --- End diff -- Also, doesn't this try to touch many files in driver-side? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...
Github user andreweduffy commented on a diff in the pull request: https://github.com/apache/spark/pull/14649#discussion_r74860656 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -703,6 +703,16 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-17059: Allow Allow FileFormat to specify partition pruning strategy") { --- End diff -- Typo will fix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...
GitHub user andreweduffy opened a pull request: https://github.com/apache/spark/pull/14649 [SPARK-17059][SQL] Allow FileFormat to specify partition pruning strategy ## What changes were proposed in this pull request? Different FileFormat implementations may be able to make intelligent decisions about files that will need to be processed as part of a FileSourceScanExec based on the `pushedFilters` available. This PR implements a way to do that pluggably, with an implementation for Parquet that allows applying the summary metadata to prevent task creation. This has a few performance benefits: 1. Reading of files is generally slow, especially for S3. In the current Parquet implementation the summary metadata is not used and so the footers are read directly. This can be very slow for large Parquet datasets, as even as of Hadoop 2.7 S3 reads will read the entire file by default (random reads are configurable only starting on 2.7 onwards, and is disabled by default) 2. Partitions that are found to contain no files can then be pruned out or coalesced depending on the FileFormat's implementation, allowing for fewer tasks being created. ## How was this patch tested? Existing tests and Spark Shell, plus unit test for the Parquet implementation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andreweduffy/spark parquet-task-pruning Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14649.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14649 commit 46657980c00ce36c03e39540bd4399613f439e04 Author: Andrew Duffy Date: 2016-07-27T16:47:20Z Allow FileFormat to specify file pruning --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org