dongjoon-hyun commented on a change in pull request #26016: [SPARK-24914][SQL] 
New statistic to improve data size estimate for columnar storage formats
URL: https://github.com/apache/spark/pull/26016#discussion_r388017723
 
 

 ##########
 File path: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
 ##########
 @@ -1556,4 +1538,150 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
       }
     }
   }
+
+  private def checkDeserializationFactor(tableName: String, exists: Boolean): 
Unit = {
+    spark.sessionState.catalog.refreshTable(TableIdentifier(tableName))
+    val catalogTable = getCatalogTable(tableName)
+    assert(catalogTable.stats.isDefined)
+    assert(catalogTable.stats.get.deserFactor.isDefined === exists)
+  }
+
+  test("SPARK-24914 - test deserialization factor calculation (for ORC 
tables)") {
+    val table = s"sizeTest"
+    withTable(table) {
+      import spark.implicits._
+      sql(s"CREATE TABLE $table (key INT, value1 BIGINT, value2 BIGINT, value3 
BIGINT, " +
+        s"value4 BIGINT) PARTITIONED BY (ds STRING) STORED AS ORC")
+      spark.range(5000)
+        .map(i => (i * 6, i * 5, i * 4, i * 3, i * 2))
+        .toDF("key", "value1", "value2", "value3", "value4")
+        .createOrReplaceTempView("TMP1")
+      spark.range(5000)
+        .map(i => (1, 1, 1, 1, 1))
+        .toDF("key", "value1", "value2", "value3", "value4")
+        .createOrReplaceTempView("TMP2")
+
+      sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-01') SELECT * FROM 
TMP1")
+
+      val catalogTable = getCatalogTable(table)
+      assert(catalogTable.stats.isEmpty)
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+      checkDeserializationFactor(table, exists = false)
+      val origSizeInBytes = 
spark.table(table).queryExecution.optimizedPlan.stats.sizeInBytes
+      logInfo(s"original sizeInBytes (file size): $origSizeInBytes")
+
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> 
s"${origSizeInBytes - 1}") {
+        val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = 
t2.key")
+        checkNumBroadcastHashJoins(res, 0,
+          "sort merge join should be taken as threshold is smaller than table 
size")
+      }
+
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> 
s"${origSizeInBytes + 1}") {
+        val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = 
t2.key")
+        checkNumBroadcastHashJoins(res, 1,
+          "broadcast join should be taken as the threshold is greater than 
table size")
+      }
+
+      withSQLConf(
+        SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "true",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}") 
{
+        sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+        checkDeserializationFactor(table, exists = true)
+        val newSizeInBytes = 
spark.table(table).queryExecution.optimizedPlan.stats.sizeInBytes
+        assert(2 * origSizeInBytes <= newSizeInBytes)
+        logInfo(s"sizeInBytes after applying deserFactor: $newSizeInBytes")
+        val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = 
t2.key")
+        checkNumBroadcastHashJoins(res, 0,
+          "sort merge join should be taken despite the threshold is greater 
than the table" +
+            "size as the deserialization factor is applied")
+      }
+
+      withSQLConf(
+        SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "false",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}") 
{
+        sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+        checkDeserializationFactor(table, exists = true)
+        val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = 
t2.key")
+        checkNumBroadcastHashJoins(res, 0,
+        "sort merge join should be taken despite deserialization factor 
calculation is " +
+          "disabled as the old factor is reused")
+      }
+
+      withSQLConf(SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "true") {
+        val catalogTableBefore = getCatalogTable(table)
+        val deserFactorBefore = catalogTableBefore.stats.get.deserFactor.get
+        sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-02') SELECT * 
FROM TMP1")
+        sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+        spark.sessionState.catalog.refreshTable(TableIdentifier(table))
+        val catalogTable1 = getCatalogTable(table)
+        assert(catalogTable1.stats.isDefined &&
+          catalogTable1.stats.get.deserFactor.isDefined)
+        assert(catalogTable1.stats.get.deserFactor.get === deserFactorBefore,
+          "deserFactor should not change by adding a smaller or same size 
partition")
+
+        sql(s"INSERT INTO TABLE $table PARTITION (ds='2010-01-03') SELECT * 
FROM TMP2")
+        sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+
+        spark.sessionState.catalog.refreshTable(TableIdentifier(table))
+        val catalogTable2 = getCatalogTable(table)
+        assert(catalogTable2.stats.isDefined &&
+          catalogTable2.stats.get.deserFactor.isDefined)
+        assert(catalogTable2.stats.get.deserFactor.get > deserFactorBefore,
+          "deserialization factor increased after adding a partition which is 
more compressed")
+      }
+
+      sql(s"TRUNCATE TABLE $table")
+      withSQLConf(
+        SQLConf.DESERIALIZATION_FACTOR_CALC_ENABLED.key -> "false",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"${origSizeInBytes + 1}") 
{
+        sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+        checkDeserializationFactor(table, exists = false)
+        val res = sql(s"SELECT * FROM $table t1, $table t2 WHERE t1.key = 
t2.key")
+        checkNumBroadcastHashJoins(res, 1,
+          "broadcast join should be taken as deserialization factor is deleted 
by TRUNCATE")
+      }
+    }
+  }
+
+  test("SPARK-24914 - test changing of the deserialization factor by setting 
the table property") {
 
 Review comment:
   Although I understand the intention, but please don't use `Parquet` in PR. 
Setting table property manually is really hacky and having this test case in 
the open source community tends to mislead the users.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to