This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new be6d39c [SPARK-27668][SQL] File source V2: support reporting statistics be6d39c is described below commit be6d39c379c87e9a36789b397510425061d36c25 Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Mon May 13 14:16:11 2019 +0800 [SPARK-27668][SQL] File source V2: support reporting statistics ## What changes were proposed in this pull request? In File source V1, the statistics of `HadoopFsRelation` is `compressionFactor * sizeInBytesOfAllFiles`. To follow it, we can implement the interface SupportsReportStatistics in FileScan and report the same statistics. ## How was this patch tested? Unit test Closes #24571 from gengliangwang/stats. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/datasources/v2/FileScan.scala | 18 ++++- .../spark/sql/FileBasedDataSourceSuite.scala | 62 +++++++++++++++- .../columnar/InMemoryColumnarQuerySuite.scala | 73 ++++++++++--------- .../datasources/HadoopFsRelationSuite.scala | 83 ---------------------- 4 files changed, 114 insertions(+), 122 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index b2b45e8..84a1274 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import java.util.Locale +import java.util.{Locale, OptionalLong} import org.apache.hadoop.fs.Path @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan} +import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -34,7 +34,7 @@ abstract class FileScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, readDataSchema: StructType, - readPartitionSchema: StructType) extends Scan with Batch { + readPartitionSchema: StructType) extends Scan with Batch with SupportsReportStatistics { /** * Returns whether a file with `path` could be split or not. */ @@ -81,6 +81,18 @@ abstract class FileScan( partitions.toArray } + override def estimateStatistics(): Statistics = { + new Statistics { + override def sizeInBytes(): OptionalLong = { + val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor + val size = (compressionFactor * fileIndex.sizeInBytes).toLong + OptionalLong.of(size) + } + + override def numRows(): OptionalLong = OptionalLong.empty() + } + } + override def toBatch: Batch = this override def readSchema(): StructType = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 3ab5bf2..dd11b5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.io.{File, FileNotFoundException} +import java.io.{File, FilenameFilter, FileNotFoundException} import java.util.Locale import scala.collection.mutable @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -584,6 +585,65 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } } + + test("sizeInBytes should be the total size of all files") { + Seq("orc", "").foreach { useV1SourceReaderList => + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { + withTempDir { dir => + dir.delete() + spark.range(1000).write.orc(dir.toString) + // ignore hidden files + val allFiles = dir.listFiles(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + !name.startsWith(".") && !name.startsWith("_") + } + }) + val totalSize = allFiles.map(_.length()).sum + val df = spark.read.orc(dir.toString) + assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize)) + } + } + } + } + + test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes effect") { + Seq(1.0, 0.5).foreach { compressionFactor => + withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString, + "spark.sql.autoBroadcastJoinThreshold" -> "250") { + withTempPath { workDir => + // the file size is 486 bytes + val workDirPath = workDir.getAbsolutePath + val data1 = Seq(100, 200, 300, 400).toDF("count") + data1.write.orc(workDirPath + "/data1") + val df1FromFile = spark.read.orc(workDirPath + "/data1") + val data2 = Seq(100, 200, 300, 400).toDF("count") + data2.write.orc(workDirPath + "/data2") + val df2FromFile = spark.read.orc(workDirPath + "/data2") + val joinedDF = df1FromFile.join(df2FromFile, Seq("count")) + if (compressionFactor == 0.5) { + val bJoinExec = joinedDF.queryExecution.executedPlan.collect { + case bJoin: BroadcastHashJoinExec => bJoin + } + assert(bJoinExec.nonEmpty) + val smJoinExec = joinedDF.queryExecution.executedPlan.collect { + case smJoin: SortMergeJoinExec => smJoin + } + assert(smJoinExec.isEmpty) + } else { + // compressionFactor is 1.0 + val bJoinExec = joinedDF.queryExecution.executedPlan.collect { + case bJoin: BroadcastHashJoinExec => bJoin + } + assert(bJoinExec.isEmpty) + val smJoinExec = joinedDF.queryExecution.executedPlan.collect { + case smJoin: SortMergeJoinExec => smJoin + } + assert(smJoinExec.nonEmpty) + } + } + } + } + } } object TestingUDT { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index e40528f..10a105c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -505,42 +505,45 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { - // This test case depends on the size of parquet in statistics. - withSQLConf( + Seq("orc", "").foreach { useV1SourceReaderList => + // This test case depends on the size of ORC in statistics. + withSQLConf( SQLConf.CBO_ENABLED.key -> "true", - SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "parquet") { - withTempPath { workDir => - withTable("table1") { - val workDirPath = workDir.getAbsolutePath - val data = Seq(100, 200, 300, 400).toDF("count") - data.write.parquet(workDirPath) - val dfFromFile = spark.read.parquet(workDirPath).cache() - val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { - case plan: InMemoryRelation => plan - }.head - // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 868) - - // InMemoryRelation's stats is updated after materializing RDD - dfFromFile.collect() - assert(inMemoryRelation.computeStats().sizeInBytes === 16) - - // test of catalog table - val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() - val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. - collect { case plan: InMemoryRelation => plan }.head - - // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats - // is calculated - assert(inMemoryRelation2.computeStats().sizeInBytes === 868) - - // InMemoryRelation's stats should be updated after calculating stats of the table - // clear cache to simulate a fresh environment - dfFromTable.unpersist(blocking = true) - spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") - val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan. - collect { case plan: InMemoryRelation => plan }.head - assert(inMemoryRelation3.computeStats().sizeInBytes === 48) + SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "orc", + SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { + withTempPath { workDir => + withTable("table1") { + val workDirPath = workDir.getAbsolutePath + val data = Seq(100, 200, 300, 400).toDF("count") + data.write.orc(workDirPath) + val dfFromFile = spark.read.orc(workDirPath).cache() + val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { + case plan: InMemoryRelation => plan + }.head + // InMemoryRelation's stats is file size before the underlying RDD is materialized + assert(inMemoryRelation.computeStats().sizeInBytes === 486) + + // InMemoryRelation's stats is updated after materializing RDD + dfFromFile.collect() + assert(inMemoryRelation.computeStats().sizeInBytes === 16) + + // test of catalog table + val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() + val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. + collect { case plan: InMemoryRelation => plan }.head + + // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's + // stats is calculated + assert(inMemoryRelation2.computeStats().sizeInBytes === 486) + + // InMemoryRelation's stats should be updated after calculating stats of the table + // clear cache to simulate a fresh environment + dfFromTable.unpersist(blocking = true) + spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") + val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan. + collect { case plan: InMemoryRelation => plan }.head + assert(inMemoryRelation3.computeStats().sizeInBytes === 48) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala deleted file mode 100644 index 6e08ee3..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import java.io.{File, FilenameFilter} - -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} -import org.apache.spark.sql.test.SharedSQLContext - -class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { - - test("sizeInBytes should be the total size of all files") { - withTempDir{ dir => - dir.delete() - spark.range(1000).write.parquet(dir.toString) - // ignore hidden files - val allFiles = dir.listFiles(new FilenameFilter { - override def accept(dir: File, name: String): Boolean = { - !name.startsWith(".") && !name.startsWith("_") - } - }) - val totalSize = allFiles.map(_.length()).sum - val df = spark.read.parquet(dir.toString) - assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize)) - } - } - - test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") { - import testImplicits._ - Seq(1.0, 0.5).foreach { compressionFactor => - withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString, - "spark.sql.autoBroadcastJoinThreshold" -> "434") { - withTempPath { workDir => - // the file size is 740 bytes - val workDirPath = workDir.getAbsolutePath - val data1 = Seq(100, 200, 300, 400).toDF("count") - data1.write.parquet(workDirPath + "/data1") - val df1FromFile = spark.read.parquet(workDirPath + "/data1") - val data2 = Seq(100, 200, 300, 400).toDF("count") - data2.write.parquet(workDirPath + "/data2") - val df2FromFile = spark.read.parquet(workDirPath + "/data2") - val joinedDF = df1FromFile.join(df2FromFile, Seq("count")) - if (compressionFactor == 0.5) { - val bJoinExec = joinedDF.queryExecution.executedPlan.collect { - case bJoin: BroadcastHashJoinExec => bJoin - } - assert(bJoinExec.nonEmpty) - val smJoinExec = joinedDF.queryExecution.executedPlan.collect { - case smJoin: SortMergeJoinExec => smJoin - } - assert(smJoinExec.isEmpty) - } else { - // compressionFactor is 1.0 - val bJoinExec = joinedDF.queryExecution.executedPlan.collect { - case bJoin: BroadcastHashJoinExec => bJoin - } - assert(bJoinExec.isEmpty) - val smJoinExec = joinedDF.queryExecution.executedPlan.collect { - case smJoin: SortMergeJoinExec => smJoin - } - assert(smJoinExec.nonEmpty) - } - } - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org