shahidki31 commented on a change in pull request #24715: [SPARK-25474][SQL] Data source tables support fallback to HDFS for size estimation URL: https://github.com/apache/spark/pull/24715#discussion_r316517654
########## File path: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ########## @@ -650,4 +652,129 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } } + + test("Non-partitioned data source table support fallback to HDFS for size estimation") { + withTempDir { dir => + Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTable("spark_25474") { + sql(s"CREATE TABLE spark_25474 (c1 BIGINT) USING PARQUET LOCATION '${dir.toURI}'") + spark.range(5).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + + assert(getCatalogTable("spark_25474").stats.isEmpty) + val relation = spark.table("spark_25474").queryExecution.analyzed.children.head + // Table statistics are always recalculated by FileIndex + assert(relation.stats.sizeInBytes === getDataSize(dir)) + } + } + } + } + } + + test("Partitioned data source table support fallback to HDFS for size estimation") { + Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTempDir { dir => + withTable("spark_25474") { + sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") + sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + + assert(getCatalogTable("spark_25474").stats.isEmpty) + val relation = spark.table("spark_25474").queryExecution.analyzed.children.head + if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === + CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) + } else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) + } + } + } + } + } + } + + test("Partitioned data source table support fallback to HDFS for size estimation" + + "with defaultSizeInBytes") { + val defaultSizeInBytes = 10 * 1024 * 1024 + Seq(false, true).foreach { fallBackToHDFS => + withSQLConf( + SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS", + SQLConf.DEFAULT_SIZE_IN_BYTES.key -> s"$defaultSizeInBytes") { + withTempDir { dir => + withTable("spark_25474") { + sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") + sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + + assert(getCatalogTable("spark_25474").stats.isEmpty) + val relation = spark.table("spark_25474").queryExecution.analyzed.children.head + if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === + CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474"))) + } else { + assert(relation.stats.sizeInBytes === defaultSizeInBytes) + } + } + } + } + } + } + + test("Partitioned data source table stats should be cached") { + Seq(false, true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTempDir { dir => + withTable("spark_25474") { + sql("CREATE TABLE spark_25474(a int, b int) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") + sql("INSERT INTO TABLE spark_25474 PARTITION(a=1) SELECT 2") + + assert(getCatalogTable("spark_25474").stats.isEmpty) + val relation = spark.table("spark_25474").queryExecution.analyzed.children.head + if (fallBackToHDFS) { + val dataSize = + CommandUtils.getSizeInBytesFallBackToHdfs(spark, getCatalogTable("spark_25474")) + assert(relation.stats.sizeInBytes === dataSize) + + val qualifiedTableName = + QualifiedTableName(spark.sessionState.catalog.getCurrentDatabase, "spark_25474") + val logicalRelation = spark.sessionState.catalog.getCachedTable(qualifiedTableName) + .asInstanceOf[LogicalRelation] + assert(logicalRelation.catalogTable.get.stats.get.sizeInBytes === dataSize) + } else { + assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes) + } + } + } + } + } + } + + test("External partitioned data source table does not support fallback to HDFS " + + "for size estimation") { + Seq(true).foreach { fallBackToHDFS => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFS") { + withTempDir { dir => + withTable("spark_25474") { + sql("CREATE TABLE spark_25474(a bigint, b bigint) USING parquet " + + s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") + + withTempDir { partitionDir => + spark.range(5).write.mode(SaveMode.Overwrite).parquet(partitionDir.getCanonicalPath) + sql(s"ALTER TABLE spark_25474 ADD PARTITION (a=1) LOCATION '$partitionDir'") + assert(getCatalogTable("spark_25474").stats.isEmpty) + val relation = spark.table("spark_25474").queryExecution.analyzed.children.head + assert(spark.table("spark_25474").count() === 5) + if (fallBackToHDFS) { + assert(relation.stats.sizeInBytes === 0) Review comment: I think, non-partitioned data source table is already getting correct statistics. I am not sure, we need to support fallback to HDFS for size for non partitioned table. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org