This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new be6d39c  [SPARK-27668][SQL] File source V2: support reporting 
statistics
be6d39c is described below

commit be6d39c379c87e9a36789b397510425061d36c25
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Mon May 13 14:16:11 2019 +0800

    [SPARK-27668][SQL] File source V2: support reporting statistics
    
    ## What changes were proposed in this pull request?
    
    In File source V1, the statistics of `HadoopFsRelation` is 
`compressionFactor * sizeInBytesOfAllFiles`.
    To follow it, we can implement the interface SupportsReportStatistics in 
FileScan and report the same statistics.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24571 from gengliangwang/stats.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/execution/datasources/v2/FileScan.scala    | 18 ++++-
 .../spark/sql/FileBasedDataSourceSuite.scala       | 62 +++++++++++++++-
 .../columnar/InMemoryColumnarQuerySuite.scala      | 73 ++++++++++---------
 .../datasources/HadoopFsRelationSuite.scala        | 83 ----------------------
 4 files changed, 114 insertions(+), 122 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index b2b45e8..84a1274 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.Locale
+import java.util.{Locale, OptionalLong}
 
 import org.apache.hadoop.fs.Path
 
@@ -26,7 +26,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.execution.PartitionedFileUtil
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan}
+import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -34,7 +34,7 @@ abstract class FileScan(
     sparkSession: SparkSession,
     fileIndex: PartitioningAwareFileIndex,
     readDataSchema: StructType,
-    readPartitionSchema: StructType) extends Scan with Batch {
+    readPartitionSchema: StructType) extends Scan with Batch with 
SupportsReportStatistics {
   /**
    * Returns whether a file with `path` could be split or not.
    */
@@ -81,6 +81,18 @@ abstract class FileScan(
     partitions.toArray
   }
 
+  override def estimateStatistics(): Statistics = {
+    new Statistics {
+      override def sizeInBytes(): OptionalLong = {
+        val compressionFactor = 
sparkSession.sessionState.conf.fileCompressionFactor
+        val size = (compressionFactor * fileIndex.sizeInBytes).toLong
+        OptionalLong.of(size)
+      }
+
+      override def numRows(): OptionalLong = OptionalLong.empty()
+    }
+  }
+
   override def toBatch: Batch = this
 
   override def readSchema(): StructType =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 3ab5bf2..dd11b5c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.io.{File, FileNotFoundException}
+import java.io.{File, FilenameFilter, FileNotFoundException}
 import java.util.Locale
 
 import scala.collection.mutable
@@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, 
NullUDT}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -584,6 +585,65 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
       }
     }
   }
+
+  test("sizeInBytes should be the total size of all files") {
+    Seq("orc", "").foreach { useV1SourceReaderList =>
+      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> 
useV1SourceReaderList) {
+        withTempDir { dir =>
+          dir.delete()
+          spark.range(1000).write.orc(dir.toString)
+          // ignore hidden files
+          val allFiles = dir.listFiles(new FilenameFilter {
+            override def accept(dir: File, name: String): Boolean = {
+              !name.startsWith(".") && !name.startsWith("_")
+            }
+          })
+          val totalSize = allFiles.map(_.length()).sum
+          val df = spark.read.orc(dir.toString)
+          assert(df.queryExecution.logical.stats.sizeInBytes === 
BigInt(totalSize))
+        }
+      }
+    }
+  }
+
+  test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes 
effect") {
+    Seq(1.0, 0.5).foreach { compressionFactor =>
+      withSQLConf("spark.sql.sources.fileCompressionFactor" -> 
compressionFactor.toString,
+        "spark.sql.autoBroadcastJoinThreshold" -> "250") {
+        withTempPath { workDir =>
+          // the file size is 486 bytes
+          val workDirPath = workDir.getAbsolutePath
+          val data1 = Seq(100, 200, 300, 400).toDF("count")
+          data1.write.orc(workDirPath + "/data1")
+          val df1FromFile = spark.read.orc(workDirPath + "/data1")
+          val data2 = Seq(100, 200, 300, 400).toDF("count")
+          data2.write.orc(workDirPath + "/data2")
+          val df2FromFile = spark.read.orc(workDirPath + "/data2")
+          val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
+          if (compressionFactor == 0.5) {
+            val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case bJoin: BroadcastHashJoinExec => bJoin
+            }
+            assert(bJoinExec.nonEmpty)
+            val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case smJoin: SortMergeJoinExec => smJoin
+            }
+            assert(smJoinExec.isEmpty)
+          } else {
+            // compressionFactor is 1.0
+            val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case bJoin: BroadcastHashJoinExec => bJoin
+            }
+            assert(bJoinExec.isEmpty)
+            val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case smJoin: SortMergeJoinExec => smJoin
+            }
+            assert(smJoinExec.nonEmpty)
+          }
+        }
+      }
+    }
+  }
 }
 
 object TestingUDT {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index e40528f..10a105c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -505,42 +505,45 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("SPARK-22673: InMemoryRelation should utilize existing stats of the 
plan to be cached") {
-    // This test case depends on the size of parquet in statistics.
-    withSQLConf(
+    Seq("orc", "").foreach { useV1SourceReaderList =>
+      // This test case depends on the size of ORC in statistics.
+      withSQLConf(
         SQLConf.CBO_ENABLED.key -> "true",
-        SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "parquet") {
-      withTempPath { workDir =>
-        withTable("table1") {
-          val workDirPath = workDir.getAbsolutePath
-          val data = Seq(100, 200, 300, 400).toDF("count")
-          data.write.parquet(workDirPath)
-          val dfFromFile = spark.read.parquet(workDirPath).cache()
-          val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
-            case plan: InMemoryRelation => plan
-          }.head
-          // InMemoryRelation's stats is file size before the underlying RDD 
is materialized
-          assert(inMemoryRelation.computeStats().sizeInBytes === 868)
-
-          // InMemoryRelation's stats is updated after materializing RDD
-          dfFromFile.collect()
-          assert(inMemoryRelation.computeStats().sizeInBytes === 16)
-
-          // test of catalog table
-          val dfFromTable = spark.catalog.createTable("table1", 
workDirPath).cache()
-          val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
-            collect { case plan: InMemoryRelation => plan }.head
-
-          // Even CBO enabled, InMemoryRelation's stats keeps as the file size 
before table's stats
-          // is calculated
-          assert(inMemoryRelation2.computeStats().sizeInBytes === 868)
-
-          // InMemoryRelation's stats should be updated after calculating 
stats of the table
-          // clear cache to simulate a fresh environment
-          dfFromTable.unpersist(blocking = true)
-          spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
-          val inMemoryRelation3 = 
spark.read.table("table1").cache().queryExecution.optimizedPlan.
-            collect { case plan: InMemoryRelation => plan }.head
-          assert(inMemoryRelation3.computeStats().sizeInBytes === 48)
+        SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "orc",
+        SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) {
+        withTempPath { workDir =>
+          withTable("table1") {
+            val workDirPath = workDir.getAbsolutePath
+            val data = Seq(100, 200, 300, 400).toDF("count")
+            data.write.orc(workDirPath)
+            val dfFromFile = spark.read.orc(workDirPath).cache()
+            val inMemoryRelation = 
dfFromFile.queryExecution.optimizedPlan.collect {
+              case plan: InMemoryRelation => plan
+            }.head
+            // InMemoryRelation's stats is file size before the underlying RDD 
is materialized
+            assert(inMemoryRelation.computeStats().sizeInBytes === 486)
+
+            // InMemoryRelation's stats is updated after materializing RDD
+            dfFromFile.collect()
+            assert(inMemoryRelation.computeStats().sizeInBytes === 16)
+
+            // test of catalog table
+            val dfFromTable = spark.catalog.createTable("table1", 
workDirPath).cache()
+            val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
+              collect { case plan: InMemoryRelation => plan }.head
+
+            // Even CBO enabled, InMemoryRelation's stats keeps as the file 
size before table's
+            // stats is calculated
+            assert(inMemoryRelation2.computeStats().sizeInBytes === 486)
+
+            // InMemoryRelation's stats should be updated after calculating 
stats of the table
+            // clear cache to simulate a fresh environment
+            dfFromTable.unpersist(blocking = true)
+            spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
+            val inMemoryRelation3 = 
spark.read.table("table1").cache().queryExecution.optimizedPlan.
+              collect { case plan: InMemoryRelation => plan }.head
+            assert(inMemoryRelation3.computeStats().sizeInBytes === 48)
+          }
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
deleted file mode 100644
index 6e08ee3..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.io.{File, FilenameFilter}
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
-import org.apache.spark.sql.test.SharedSQLContext
-
-class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
-
-  test("sizeInBytes should be the total size of all files") {
-    withTempDir{ dir =>
-      dir.delete()
-      spark.range(1000).write.parquet(dir.toString)
-      // ignore hidden files
-      val allFiles = dir.listFiles(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = {
-          !name.startsWith(".") && !name.startsWith("_")
-        }
-      })
-      val totalSize = allFiles.map(_.length()).sum
-      val df = spark.read.parquet(dir.toString)
-      assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize))
-    }
-  }
-
-  test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") {
-    import testImplicits._
-    Seq(1.0, 0.5).foreach { compressionFactor =>
-      withSQLConf("spark.sql.sources.fileCompressionFactor" -> 
compressionFactor.toString,
-        "spark.sql.autoBroadcastJoinThreshold" -> "434") {
-        withTempPath { workDir =>
-          // the file size is 740 bytes
-          val workDirPath = workDir.getAbsolutePath
-          val data1 = Seq(100, 200, 300, 400).toDF("count")
-          data1.write.parquet(workDirPath + "/data1")
-          val df1FromFile = spark.read.parquet(workDirPath + "/data1")
-          val data2 = Seq(100, 200, 300, 400).toDF("count")
-          data2.write.parquet(workDirPath + "/data2")
-          val df2FromFile = spark.read.parquet(workDirPath + "/data2")
-          val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
-          if (compressionFactor == 0.5) {
-            val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
-              case bJoin: BroadcastHashJoinExec => bJoin
-            }
-            assert(bJoinExec.nonEmpty)
-            val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
-              case smJoin: SortMergeJoinExec => smJoin
-            }
-            assert(smJoinExec.isEmpty)
-          } else {
-            // compressionFactor is 1.0
-            val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
-              case bJoin: BroadcastHashJoinExec => bJoin
-            }
-            assert(bJoinExec.isEmpty)
-            val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
-              case smJoin: SortMergeJoinExec => smJoin
-            }
-            assert(smJoinExec.nonEmpty)
-          }
-        }
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to