[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21950#discussion_r218608537 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -1051,11 +1052,27 @@ private[hive] object HiveClientImpl { // When table is external, `totalSize` is always zero, which will influence join strategy. // So when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero, // return None. +// If a table has a deserialization factor, the table owner expects the in-memory +// representation of the table to be larger than the table's totalSize value. In that case, +// multiply totalSize by the deserialization factor and use that number instead. +// If the user has set spark.sql.statistics.ignoreRawDataSize to true (because of HIVE-20079, +// for example), don't use rawDataSize. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. -if (totalSize.isDefined && totalSize.get > 0L) { - Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0))) -} else if (rawDataSize.isDefined && rawDataSize.get > 0) { +val factor = try { +properties.get("deserFactor").getOrElse("1.0").toDouble --- End diff -- I need to eliminate this duplication: There's a similar lookup and calculation done in PruneFileSourcePartitionsSuite. Also, I should check if a Long value, used as an intermediate value, is acceptable to hold file sizes (possibly, since a Long can represent 8 exabytes) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21950#discussion_r217216975 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -91,4 +91,28 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te assert(size2 < tableStats.get.sizeInBytes) } } + + test("Test deserialization factor against partition") { +val factor = 10 +withTable("tbl") { + spark.range(10).selectExpr("id", "id % 3 as p").write.format("parquet") +.partitionBy("p").saveAsTable("tbl") + sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS") + + val df1 = sql("SELECT * FROM tbl WHERE p = 1") + val sizes1 = df1.queryExecution.optimizedPlan.collect { +case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes + } + assert(sizes1 != 0) --- End diff -- Oops. Should be assert(sizes1(0) != 0). I will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21950#discussion_r212719073 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -76,4 +78,16 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { op } } + + private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { +val conf: SQLConf = SQLConf.get +val factor = conf.sizeDeserializationFactor +if (catalogTable.isDefined && factor != 1.0 && + // TODO: The serde check should be in a utility function, since it is also checked elsewhere + catalogTable.get.storage.serde.exists(s => s.contains("Parquet") || s.contains("Orc"))) { --- End diff -- @mgaido91 Good point. Also, I notice that even when the table's files are not compressed (say, a table backed by CSV files), the LongToUnsafeRowMap or BytesToBytesMap that backs the relation is roughly 3 times larger than the total file size. So even under the best of circumstances (i.e., the table's files are not compressed), Spark will get it wrong by several multiples. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21950#discussion_r210841402 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -76,4 +78,16 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { op } } + + private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { +val conf: SQLConf = SQLConf.get +val factor = conf.sizeDeserializationFactor +if (catalogTable.isDefined && factor != 1.0 && + // TODO: The serde check should be in a utility function, since it is also checked elsewhere + catalogTable.get.storage.serde.exists(s => s.contains("Parquet") || s.contains("Orc"))) { --- End diff -- I am not sure about this. We are saying that only Parquet/ORC files can be compressed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org