[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22693#discussion_r232556859 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) => + val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil) + computeTableStats(relation, predicates) case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => - val table = relation.tableMeta - val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength -} catch { - case e: IOException => -logWarning("Failed to get table size from hdfs.", e) -session.sessionState.conf.defaultSizeInBytes -} - } else { -session.sessionState.conf.defaultSizeInBytes + computeTableStats(relation) + } + + private def computeTableStats( + relation: HiveTableRelation, + predicates: Seq[Expression] = Nil): LogicalPlan = { +val table = relation.tableMeta +val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { +val hadoopConf = session.sessionState.newHadoopConf() +val tablePath = new Path(table.location) +val fs: FileSystem = tablePath.getFileSystem(hadoopConf) +BigInt(fs.getContentSummary(tablePath).getLength) + } catch { +case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + getSizeInBytesFromTablePartitions(table.identifier, predicates) } +} else { + getSizeInBytesFromTablePartitions(table.identifier, predicates) +} +val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes))) +relation.copy(tableMeta = withStats) + } - val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes - relation.copy(tableMeta = withStats) + private def getSizeInBytesFromTablePartitions( + tableIdentifier: TableIdentifier, + predicates: Seq[Expression] = Nil): BigInt = { +session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match { --- End diff -- How come https://github.com/apache/spark/pull/22743 solves this problem? That PR targets to invalidate cache when configurations are changed. This PR targets to compute stats from HDFS when they are not available. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Github user fjh100456 closed the pull request at: https://github.com/apache/spark/pull/22693 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/22693#discussion_r230726170 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) => + val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil) + computeTableStats(relation, predicates) case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => - val table = relation.tableMeta - val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength -} catch { - case e: IOException => -logWarning("Failed to get table size from hdfs.", e) -session.sessionState.conf.defaultSizeInBytes -} - } else { -session.sessionState.conf.defaultSizeInBytes + computeTableStats(relation) + } + + private def computeTableStats( + relation: HiveTableRelation, + predicates: Seq[Expression] = Nil): LogicalPlan = { +val table = relation.tableMeta +val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { +val hadoopConf = session.sessionState.newHadoopConf() +val tablePath = new Path(table.location) +val fs: FileSystem = tablePath.getFileSystem(hadoopConf) +BigInt(fs.getContentSummary(tablePath).getLength) + } catch { +case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + getSizeInBytesFromTablePartitions(table.identifier, predicates) } +} else { + getSizeInBytesFromTablePartitions(table.identifier, predicates) +} +val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes))) +relation.copy(tableMeta = withStats) + } - val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes - relation.copy(tableMeta = withStats) + private def getSizeInBytesFromTablePartitions( + tableIdentifier: TableIdentifier, + predicates: Seq[Expression] = Nil): BigInt = { +session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match { --- End diff -- Your solution seems much better. @wangyum Thank you. I'll close it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/22693#discussion_r230639634 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) => + val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil) + computeTableStats(relation, predicates) case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => - val table = relation.tableMeta - val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength -} catch { - case e: IOException => -logWarning("Failed to get table size from hdfs.", e) -session.sessionState.conf.defaultSizeInBytes -} - } else { -session.sessionState.conf.defaultSizeInBytes + computeTableStats(relation) + } + + private def computeTableStats( + relation: HiveTableRelation, + predicates: Seq[Expression] = Nil): LogicalPlan = { +val table = relation.tableMeta +val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { +val hadoopConf = session.sessionState.newHadoopConf() +val tablePath = new Path(table.location) +val fs: FileSystem = tablePath.getFileSystem(hadoopConf) +BigInt(fs.getContentSummary(tablePath).getLength) + } catch { +case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + getSizeInBytesFromTablePartitions(table.identifier, predicates) } +} else { + getSizeInBytesFromTablePartitions(table.identifier, predicates) +} +val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes))) +relation.copy(tableMeta = withStats) + } - val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes - relation.copy(tableMeta = withStats) + private def getSizeInBytesFromTablePartitions( + tableIdentifier: TableIdentifier, + predicates: Seq[Expression] = Nil): BigInt = { +session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match { --- End diff -- After [this refactor](https://github.com/apache/spark/pull/22743). We can avoid compute stats if `LogicalRelation` already cached. because the computed stats will not take effect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22693#discussion_r230609824 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) => + val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil) + computeTableStats(relation, predicates) case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => - val table = relation.tableMeta - val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength -} catch { - case e: IOException => -logWarning("Failed to get table size from hdfs.", e) -session.sessionState.conf.defaultSizeInBytes -} - } else { -session.sessionState.conf.defaultSizeInBytes + computeTableStats(relation) + } + + private def computeTableStats( + relation: HiveTableRelation, + predicates: Seq[Expression] = Nil): LogicalPlan = { +val table = relation.tableMeta +val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { +val hadoopConf = session.sessionState.newHadoopConf() +val tablePath = new Path(table.location) +val fs: FileSystem = tablePath.getFileSystem(hadoopConf) +BigInt(fs.getContentSummary(tablePath).getLength) + } catch { +case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + getSizeInBytesFromTablePartitions(table.identifier, predicates) } +} else { + getSizeInBytesFromTablePartitions(table.identifier, predicates) +} +val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes))) +relation.copy(tableMeta = withStats) + } - val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes - relation.copy(tableMeta = withStats) + private def getSizeInBytesFromTablePartitions( + tableIdentifier: TableIdentifier, + predicates: Seq[Expression] = Nil): BigInt = { +session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match { --- End diff -- The perf will be pretty bad when the number of partitions is huge. Thus, I think we can close this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/22693#discussion_r230554142 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case filterPlan @ Filter(_, SubqueryAlias(_, relation: HiveTableRelation)) => + val predicates = PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil) + computeTableStats(relation, predicates) case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => - val table = relation.tableMeta - val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength -} catch { - case e: IOException => -logWarning("Failed to get table size from hdfs.", e) -session.sessionState.conf.defaultSizeInBytes -} - } else { -session.sessionState.conf.defaultSizeInBytes + computeTableStats(relation) + } + + private def computeTableStats( + relation: HiveTableRelation, + predicates: Seq[Expression] = Nil): LogicalPlan = { +val table = relation.tableMeta +val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { +val hadoopConf = session.sessionState.newHadoopConf() +val tablePath = new Path(table.location) +val fs: FileSystem = tablePath.getFileSystem(hadoopConf) +BigInt(fs.getContentSummary(tablePath).getLength) + } catch { +case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + getSizeInBytesFromTablePartitions(table.identifier, predicates) } +} else { + getSizeInBytesFromTablePartitions(table.identifier, predicates) +} +val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = sizeInBytes))) +relation.copy(tableMeta = withStats) + } - val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes - relation.copy(tableMeta = withStats) + private def getSizeInBytesFromTablePartitions( + tableIdentifier: TableIdentifier, + predicates: Seq[Expression] = Nil): BigInt = { +session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates) match { --- End diff -- Have you tested the performance of `session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, predicates)` and `fs.getContentSummary(tablePath).getLength`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...
GitHub user fjh100456 opened a pull request: https://github.com/apache/spark/pull/22693 [SPARK-25701][SQL] Supports calculation of table statistics from partition's catalog statistics. ## What changes were proposed in this pull request? When determine table statistics, if the `totalSize` of the table is not defined, we fallback to HDFS to get the table statistics when `spark.sql.statistics.fallBackToHdfs` is `true`, otherwise the default value(`spark.sql.defaultSizeInBytes`) will be taken, which will lead to tables without `totalSize` property may not be broadcast(Except parquet). Fortunately, in most case the data is written into the table by a insertion command which will save the data-size in metastore, so it's possible to use metastore to calculate the table statistics. ## How was this patch tested? Add test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fjh100456/spark StatisticCommit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22693.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22693 commit e610477063b4f326b8261d59b55abce83cbb82e7 Author: fjh100456 Date: 2018-10-11T06:43:52Z [SPARK-25701][SQL] Supports calculation of table statistics from partition's catalog statistics. ## What changes were proposed in this pull request? When obtaining table statistics, if the `totalSize` of the table is not defined, we fallback to HDFS to get the table statistics when `spark.sql.statistics.fallBackToHdfs` is `true`, otherwise the default value(`spark.sql.defaultSizeInBytes`) will be taken. Fortunately, in most case the data is written into the table by a insertion command which will save the data-size in metastore, so it's possible to use metastore to calculate the table statistics. ## How was this patch tested? Add test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org