[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

2018-11-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22693#discussion_r232556859
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) 
extends Rule[LogicalPlan] {
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] 
{
   override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case filterPlan @ Filter(_, SubqueryAlias(_, relation: 
HiveTableRelation)) =>
+  val predicates = 
PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+  computeTableStats(relation, predicates)
 case relation: HiveTableRelation
 if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
-  val table = relation.tableMeta
-  val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-try {
-  val hadoopConf = session.sessionState.newHadoopConf()
-  val tablePath = new Path(table.location)
-  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-  fs.getContentSummary(tablePath).getLength
-} catch {
-  case e: IOException =>
-logWarning("Failed to get table size from hdfs.", e)
-session.sessionState.conf.defaultSizeInBytes
-}
-  } else {
-session.sessionState.conf.defaultSizeInBytes
+  computeTableStats(relation)
+  }
+
+  private def computeTableStats(
+  relation: HiveTableRelation,
+  predicates: Seq[Expression] = Nil): LogicalPlan = {
+val table = relation.tableMeta
+val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+  try {
+val hadoopConf = session.sessionState.newHadoopConf()
+val tablePath = new Path(table.location)
+val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+BigInt(fs.getContentSummary(tablePath).getLength)
+  } catch {
+case e: IOException =>
+  logWarning("Failed to get table size from hdfs.", e)
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
   }
+} else {
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
+}
+val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes 
= sizeInBytes)))
+relation.copy(tableMeta = withStats)
+  }
 
-  val withStats = table.copy(stats = 
Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes
-  relation.copy(tableMeta = withStats)
+  private def getSizeInBytesFromTablePartitions(
+  tableIdentifier: TableIdentifier,
+  predicates: Seq[Expression] = Nil): BigInt = {
+session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, 
predicates) match {
--- End diff --

How come https://github.com/apache/spark/pull/22743 solves this problem? 
That PR targets to invalidate cache when configurations are changed. This PR 
targets to compute stats from HDFS when they are not available.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

2018-11-05 Thread fjh100456
Github user fjh100456 closed the pull request at:

https://github.com/apache/spark/pull/22693


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

2018-11-05 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22693#discussion_r230726170
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) 
extends Rule[LogicalPlan] {
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] 
{
   override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case filterPlan @ Filter(_, SubqueryAlias(_, relation: 
HiveTableRelation)) =>
+  val predicates = 
PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+  computeTableStats(relation, predicates)
 case relation: HiveTableRelation
 if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
-  val table = relation.tableMeta
-  val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-try {
-  val hadoopConf = session.sessionState.newHadoopConf()
-  val tablePath = new Path(table.location)
-  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-  fs.getContentSummary(tablePath).getLength
-} catch {
-  case e: IOException =>
-logWarning("Failed to get table size from hdfs.", e)
-session.sessionState.conf.defaultSizeInBytes
-}
-  } else {
-session.sessionState.conf.defaultSizeInBytes
+  computeTableStats(relation)
+  }
+
+  private def computeTableStats(
+  relation: HiveTableRelation,
+  predicates: Seq[Expression] = Nil): LogicalPlan = {
+val table = relation.tableMeta
+val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+  try {
+val hadoopConf = session.sessionState.newHadoopConf()
+val tablePath = new Path(table.location)
+val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+BigInt(fs.getContentSummary(tablePath).getLength)
+  } catch {
+case e: IOException =>
+  logWarning("Failed to get table size from hdfs.", e)
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
   }
+} else {
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
+}
+val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes 
= sizeInBytes)))
+relation.copy(tableMeta = withStats)
+  }
 
-  val withStats = table.copy(stats = 
Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes
-  relation.copy(tableMeta = withStats)
+  private def getSizeInBytesFromTablePartitions(
+  tableIdentifier: TableIdentifier,
+  predicates: Seq[Expression] = Nil): BigInt = {
+session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, 
predicates) match {
--- End diff --

Your solution seems much better.  @wangyum 
Thank you.  I'll close it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

2018-11-04 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/22693#discussion_r230639634
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) 
extends Rule[LogicalPlan] {
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] 
{
   override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case filterPlan @ Filter(_, SubqueryAlias(_, relation: 
HiveTableRelation)) =>
+  val predicates = 
PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+  computeTableStats(relation, predicates)
 case relation: HiveTableRelation
 if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
-  val table = relation.tableMeta
-  val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-try {
-  val hadoopConf = session.sessionState.newHadoopConf()
-  val tablePath = new Path(table.location)
-  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-  fs.getContentSummary(tablePath).getLength
-} catch {
-  case e: IOException =>
-logWarning("Failed to get table size from hdfs.", e)
-session.sessionState.conf.defaultSizeInBytes
-}
-  } else {
-session.sessionState.conf.defaultSizeInBytes
+  computeTableStats(relation)
+  }
+
+  private def computeTableStats(
+  relation: HiveTableRelation,
+  predicates: Seq[Expression] = Nil): LogicalPlan = {
+val table = relation.tableMeta
+val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+  try {
+val hadoopConf = session.sessionState.newHadoopConf()
+val tablePath = new Path(table.location)
+val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+BigInt(fs.getContentSummary(tablePath).getLength)
+  } catch {
+case e: IOException =>
+  logWarning("Failed to get table size from hdfs.", e)
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
   }
+} else {
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
+}
+val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes 
= sizeInBytes)))
+relation.copy(tableMeta = withStats)
+  }
 
-  val withStats = table.copy(stats = 
Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes
-  relation.copy(tableMeta = withStats)
+  private def getSizeInBytesFromTablePartitions(
+  tableIdentifier: TableIdentifier,
+  predicates: Seq[Expression] = Nil): BigInt = {
+session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, 
predicates) match {
--- End diff --

After [this refactor](https://github.com/apache/spark/pull/22743). We can 
avoid compute stats if `LogicalRelation` already cached. because the computed 
stats will not take effect. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

2018-11-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22693#discussion_r230609824
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) 
extends Rule[LogicalPlan] {
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] 
{
   override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case filterPlan @ Filter(_, SubqueryAlias(_, relation: 
HiveTableRelation)) =>
+  val predicates = 
PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+  computeTableStats(relation, predicates)
 case relation: HiveTableRelation
 if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
-  val table = relation.tableMeta
-  val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-try {
-  val hadoopConf = session.sessionState.newHadoopConf()
-  val tablePath = new Path(table.location)
-  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-  fs.getContentSummary(tablePath).getLength
-} catch {
-  case e: IOException =>
-logWarning("Failed to get table size from hdfs.", e)
-session.sessionState.conf.defaultSizeInBytes
-}
-  } else {
-session.sessionState.conf.defaultSizeInBytes
+  computeTableStats(relation)
+  }
+
+  private def computeTableStats(
+  relation: HiveTableRelation,
+  predicates: Seq[Expression] = Nil): LogicalPlan = {
+val table = relation.tableMeta
+val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+  try {
+val hadoopConf = session.sessionState.newHadoopConf()
+val tablePath = new Path(table.location)
+val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+BigInt(fs.getContentSummary(tablePath).getLength)
+  } catch {
+case e: IOException =>
+  logWarning("Failed to get table size from hdfs.", e)
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
   }
+} else {
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
+}
+val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes 
= sizeInBytes)))
+relation.copy(tableMeta = withStats)
+  }
 
-  val withStats = table.copy(stats = 
Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes
-  relation.copy(tableMeta = withStats)
+  private def getSizeInBytesFromTablePartitions(
+  tableIdentifier: TableIdentifier,
+  predicates: Seq[Expression] = Nil): BigInt = {
+session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, 
predicates) match {
--- End diff --

The perf will be pretty bad when the number of partitions is huge. Thus, I 
think we can close this PR. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

2018-11-03 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/22693#discussion_r230554142
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -115,26 +116,45 @@ class ResolveHiveSerdeTable(session: SparkSession) 
extends Rule[LogicalPlan] {
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] 
{
   override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case filterPlan @ Filter(_, SubqueryAlias(_, relation: 
HiveTableRelation)) =>
+  val predicates = 
PhysicalOperation.unapply(filterPlan).map(_._2).getOrElse(Nil)
+  computeTableStats(relation, predicates)
 case relation: HiveTableRelation
 if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
-  val table = relation.tableMeta
-  val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-try {
-  val hadoopConf = session.sessionState.newHadoopConf()
-  val tablePath = new Path(table.location)
-  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-  fs.getContentSummary(tablePath).getLength
-} catch {
-  case e: IOException =>
-logWarning("Failed to get table size from hdfs.", e)
-session.sessionState.conf.defaultSizeInBytes
-}
-  } else {
-session.sessionState.conf.defaultSizeInBytes
+  computeTableStats(relation)
+  }
+
+  private def computeTableStats(
+  relation: HiveTableRelation,
+  predicates: Seq[Expression] = Nil): LogicalPlan = {
+val table = relation.tableMeta
+val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+  try {
+val hadoopConf = session.sessionState.newHadoopConf()
+val tablePath = new Path(table.location)
+val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+BigInt(fs.getContentSummary(tablePath).getLength)
+  } catch {
+case e: IOException =>
+  logWarning("Failed to get table size from hdfs.", e)
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
   }
+} else {
+  getSizeInBytesFromTablePartitions(table.identifier, predicates)
+}
+val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes 
= sizeInBytes)))
+relation.copy(tableMeta = withStats)
+  }
 
-  val withStats = table.copy(stats = 
Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes
-  relation.copy(tableMeta = withStats)
+  private def getSizeInBytesFromTablePartitions(
+  tableIdentifier: TableIdentifier,
+  predicates: Seq[Expression] = Nil): BigInt = {
+session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, 
predicates) match {
--- End diff --

Have you tested the performance of 
`session.sessionState.catalog.listPartitionsByFilter(tableIdentifier, 
predicates)` and `fs.getContentSummary(tablePath).getLength`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22693: [SPARK-25701][SQL] Supports calculation of table ...

2018-10-11 Thread fjh100456
GitHub user fjh100456 opened a pull request:

https://github.com/apache/spark/pull/22693

[SPARK-25701][SQL] Supports calculation of table statistics from 
partition's catalog statistics.

## What changes were proposed in this pull request?

When determine table statistics, if the `totalSize` of the table is not 
defined, we fallback to HDFS to get the table statistics when 
`spark.sql.statistics.fallBackToHdfs` is `true`, otherwise the default 
value(`spark.sql.defaultSizeInBytes`) will be taken, which will lead to tables 
without `totalSize` property may not be broadcast(Except parquet). 

Fortunately, in most case the data is written into the table by a insertion 
command which will save the data-size in metastore, so it's possible to use 
metastore to calculate the table statistics.

## How was this patch tested?
Add test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fjh100456/spark StatisticCommit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22693.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22693


commit e610477063b4f326b8261d59b55abce83cbb82e7
Author: fjh100456 
Date:   2018-10-11T06:43:52Z

[SPARK-25701][SQL] Supports calculation of table statistics from 
partition's catalog statistics.

## What changes were proposed in this pull request?

When obtaining table statistics, if the `totalSize` of the table is not 
defined, we fallback to HDFS to get the table statistics when 
`spark.sql.statistics.fallBackToHdfs` is `true`, otherwise the default 
value(`spark.sql.defaultSizeInBytes`) will be taken.

Fortunately, in most case the data is written into the table by a insertion 
command which will save the data-size in metastore, so it's possible to use 
metastore to calculate the table statistics.

## How was this patch tested?
Add test.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org