[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...

2016-11-10 Thread andreweduffy
Github user andreweduffy closed the pull request at:

https://github.com/apache/spark/pull/14649


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...

2016-08-29 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14649#discussion_r76716506
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -58,6 +60,9 @@ class ParquetFileFormat
   with Logging
   with Serializable {
 
+  // Attempt to cache parquet metadata for this instance of
--- End diff --

this instance of ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...

2016-08-29 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14649#discussion_r76716449
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 ---
@@ -220,6 +220,21 @@ trait FileFormat {
   }
 
   /**
+   * Allow FileFormat's to have a pluggable way to utilize pushed filters 
to eliminate partitions
--- End diff --

typo: `FileFormats`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...

2016-08-15 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/14649#discussion_r74872775
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -423,6 +425,54 @@ class ParquetFileFormat
   sqlContext.sessionState.newHadoopConf(),
   options)
   }
+
+  override def filterPartitions(
+  filters: Seq[Filter],
+  schema: StructType,
+  conf: Configuration,
+  allFiles: Seq[FileStatus],
+  root: Path,
+  partitions: Seq[Partition]): Seq[Partition] = {
+// Read the "_metadata" file if available, contains all block headers. 
On S3 better to grab
+// all of the footers in a batch rather than having to read every 
single file just to get its
+// footer.
+allFiles.find(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE).map { stat =>
+  val metadata = ParquetFileReader.readFooter(conf, stat, 
ParquetMetadataConverter.NO_FILTER)
+  partitions.map { part =>
+filterByMetadata(
+  filters,
+  schema,
+  conf,
+  root,
+  metadata,
+  part)
+  }.filterNot(_.files.isEmpty)
+}.getOrElse(partitions)
+  }
+
+  private def filterByMetadata(
+  filters: Seq[Filter],
+  schema: StructType,
+  conf: Configuration,
+  root: Path,
+  metadata: ParquetMetadata,
+  partition: Partition): Partition = {
+val blockMetadatas = metadata.getBlocks.asScala
+val parquetSchema = metadata.getFileMetaData.getSchema
+val conjunctiveFilter = filters
+  .flatMap(ParquetFilters.createFilter(schema, _))
+  .reduceOption(FilterApi.and)
+conjunctiveFilter.map { conjunction =>
+  val filteredBlocks = RowGroupFilter.filterRowGroups(
--- End diff --

Do you mind if I ask a question please?

So, if my understanding is correct, Parquet filters rowgroups in both 
normal reader and vectorized reader already 
(https://github.com/apache/spark/pull/13701). Is this doing the same thing in 
Spark-side?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...

2016-08-15 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/14649#discussion_r74872795
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -423,6 +425,54 @@ class ParquetFileFormat
   sqlContext.sessionState.newHadoopConf(),
   options)
   }
+
+  override def filterPartitions(
+  filters: Seq[Filter],
+  schema: StructType,
+  conf: Configuration,
+  allFiles: Seq[FileStatus],
+  root: Path,
+  partitions: Seq[Partition]): Seq[Partition] = {
+// Read the "_metadata" file if available, contains all block headers. 
On S3 better to grab
+// all of the footers in a batch rather than having to read every 
single file just to get its
+// footer.
+allFiles.find(_.getPath.getName == 
ParquetFileWriter.PARQUET_METADATA_FILE).map { stat =>
+  val metadata = ParquetFileReader.readFooter(conf, stat, 
ParquetMetadataConverter.NO_FILTER)
+  partitions.map { part =>
+filterByMetadata(
+  filters,
+  schema,
+  conf,
+  root,
+  metadata,
+  part)
+  }.filterNot(_.files.isEmpty)
+}.getOrElse(partitions)
+  }
+
+  private def filterByMetadata(
+  filters: Seq[Filter],
+  schema: StructType,
+  conf: Configuration,
+  root: Path,
+  metadata: ParquetMetadata,
+  partition: Partition): Partition = {
+val blockMetadatas = metadata.getBlocks.asScala
+val parquetSchema = metadata.getFileMetaData.getSchema
+val conjunctiveFilter = filters
+  .flatMap(ParquetFilters.createFilter(schema, _))
+  .reduceOption(FilterApi.and)
+conjunctiveFilter.map { conjunction =>
+  val filteredBlocks = RowGroupFilter.filterRowGroups(
--- End diff --

Also, doesn't this try to touch many files in driver-side?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...

2016-08-15 Thread andreweduffy
Github user andreweduffy commented on a diff in the pull request:

https://github.com/apache/spark/pull/14649#discussion_r74860656
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 ---
@@ -703,6 +703,16 @@ class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedSQLContext
   }
 }
   }
+
+  test("SPARK-17059: Allow Allow FileFormat to specify partition pruning 
strategy") {
--- End diff --

Typo will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14649: [SPARK-17059][SQL] Allow FileFormat to specify pa...

2016-08-15 Thread andreweduffy
GitHub user andreweduffy opened a pull request:

https://github.com/apache/spark/pull/14649

[SPARK-17059][SQL] Allow FileFormat to specify partition pruning strategy

## What changes were proposed in this pull request?

Different FileFormat implementations may be able to make intelligent 
decisions about files that will need to be processed as part of a 
FileSourceScanExec based on the `pushedFilters` available. This PR implements a 
way to do that pluggably, with an implementation for Parquet that allows 
applying the summary metadata to prevent task creation.

This has a few performance benefits:

1. Reading of files is generally slow, especially for S3. In the current 
Parquet implementation the summary metadata is not used and so the footers are 
read directly. This can be very slow for large Parquet datasets, as even as of 
Hadoop 2.7 S3 reads will read the entire file by default (random reads are 
configurable only starting on 2.7 onwards, and is disabled by default)
2. Partitions that are found to contain no files can then be pruned out or 
coalesced depending on the FileFormat's implementation, allowing for fewer 
tasks being created.

## How was this patch tested?

Existing tests and Spark Shell, plus unit test for the Parquet 
implementation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andreweduffy/spark parquet-task-pruning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14649


commit 46657980c00ce36c03e39540bd4399613f439e04
Author: Andrew Duffy 
Date:   2016-07-27T16:47:20Z

Allow FileFormat to specify file pruning




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org