Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5504f60e8 -> e13cfd6d2


[SPARK-15365][SQL] When table size statistics are not available from metastore, 
we should fallback to HDFS

## What changes were proposed in this pull request?
Currently if a table is used in join operation we rely on Metastore returned 
size to calculate if we can convert the operation to Broadcast join. This 
optimization only kicks in for table's that have the statistics available in 
metastore. Hive generally rolls over to HDFS if the statistics are not 
available directly from metastore and this seems like a reasonable choice to 
adopt given the optimization benefit of using broadcast joins.

## How was this patch tested?
I have executed queries locally to test.

Author: Parth Brahmbhatt <pbrahmbh...@netflix.com>

Closes #13150 from Parth-Brahmbhatt/SPARK-15365.

(cherry picked from commit 4acababcaba567c85f3be0d5e939d99119b82d1d)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e13cfd6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e13cfd6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e13cfd6d

Branch: refs/heads/branch-2.0
Commit: e13cfd6d265cd47365cda1a0452133434e0515df
Parents: 5504f60
Author: Parth Brahmbhatt <pbrahmbh...@netflix.com>
Authored: Tue May 24 20:58:20 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue May 24 20:58:32 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 ++++
 .../spark/sql/hive/MetastoreRelation.scala      | 33 +++++++++----
 .../apache/spark/sql/hive/StatisticsSuite.scala | 50 +++++++++++++++++++-
 3 files changed, 83 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e13cfd6d/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b91518a..4efefda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -113,6 +113,13 @@ object SQLConf {
     .longConf
     .createWithDefault(10L * 1024 * 1024)
 
+  val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
+    SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats")
+    .doc("If the table statistics are not available from table metadata enable 
fall back to hdfs." +
+      " This is useful in determining if a table is small enough to use auto 
broadcast joins.")
+    .booleanConf
+    .createWithDefault(false)
+
   val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
     .internal()
     .doc("The default table size used in query planning. By default, it is set 
to a larger " +
@@ -603,6 +610,8 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
+  def fallBackToHdfsForStatsEnabled: Boolean = 
getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
+
   def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
 
   def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/e13cfd6d/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 1671228..9c82014 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.IOException
+
 import scala.collection.JavaConverters._
 
 import com.google.common.base.Objects
+import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
 import org.apache.hadoop.hive.metastore.api.FieldSchema
@@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation(
       val rawDataSize = 
hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
       // TODO: check if this estimate is valid for tables after partition 
pruning.
       // NOTE: getting `totalSize` directly from params is kind of hacky, but 
this should be
-      // relatively cheap if parameters for the table are populated into the 
metastore.  An
-      // alternative would be going through Hadoop's FileSystem API, which can 
be expensive if a lot
-      // of RPCs are involved.  Besides `totalSize`, there are also 
`numFiles`, `numRows`,
-      // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at 
in the future.
+      // relatively cheap if parameters for the table are populated into the 
metastore.
+      // Besides `totalSize`, there are also `numFiles`, `numRows`, 
`rawDataSize` keys
+      // (see StatsSetupConst in Hive) that we can look at in the future.
       BigInt(
         // When table is external,`totalSize` is always zero, which will 
influence join strategy
         // so when `totalSize` is zero, use `rawDataSize` instead
-        // if the size is still less than zero, we use default size
-        Option(totalSize).map(_.toLong).filter(_ > 0)
-          .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
-            .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
+        // if the size is still less than zero, we try to get the file size 
from HDFS.
+        // given this is only needed for optimization, if the HDFS call fails 
we return the default.
+        if (totalSize != null && totalSize.toLong > 0L) {
+          totalSize.toLong
+        } else if (rawDataSize != null && rawDataSize.toLong > 0) {
+          rawDataSize.toLong
+        } else if 
(sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+          try {
+            val hadoopConf = sparkSession.sessionState.newHadoopConf()
+            val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
+            fs.getContentSummary(hiveQlTable.getPath).getLength
+          } catch {
+            case e: IOException =>
+              logWarning("Failed to get table size from hdfs.", e)
+              sparkSession.sessionState.conf.defaultSizeInBytes
+          }
+        } else {
+          sparkSession.sessionState.conf.defaultSizeInBytes
+        })
     }
   )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e13cfd6d/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 1a7b6c0..f8e00a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.{File, PrintWriter}
+
 import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{QueryTest, Row}
@@ -25,8 +27,9 @@ import 
org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
 
-class StatisticsSuite extends QueryTest with TestHiveSingleton {
+class StatisticsSuite extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
   import hiveContext.sql
 
   test("parse analyze commands") {
@@ -68,6 +71,51 @@ class StatisticsSuite extends QueryTest with 
TestHiveSingleton {
       classOf[AnalyzeTableCommand])
   }
 
+  test("MetastoreRelations fallback to HDFS for size estimation") {
+    val enableFallBackToHdfsForStats = 
hiveContext.conf.fallBackToHdfsForStatsEnabled
+    try {
+      withTempDir { tempDir =>
+
+        // EXTERNAL OpenCSVSerde table pointing to LOCATION
+
+        val file1 = new File(tempDir + "/data1")
+        val writer1 = new PrintWriter(file1)
+        writer1.write("1,2")
+        writer1.close()
+
+        val file2 = new File(tempDir + "/data2")
+        val writer2 = new PrintWriter(file2)
+        writer2.write("1,2")
+        writer2.close()
+
+        sql(
+          s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
+            ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
+            WITH SERDEPROPERTIES (
+              \"separatorChar\" = \",\",
+              \"quoteChar\"     = \"\\\"\",
+              \"escapeChar\"    = \"\\\\\")
+            LOCATION '$tempDir'
+          """)
+
+        hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true)
+
+        val relation = 
hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+          .asInstanceOf[MetastoreRelation]
+
+        val properties = relation.hiveQlTable.getParameters
+        assert(properties.get("totalSize").toLong <= 0, "external table 
totalSize must be <= 0")
+        assert(properties.get("rawDataSize").toLong <= 0, "external table 
rawDataSize must be <= 0")
+
+        val sizeInBytes = relation.statistics.sizeInBytes
+        assert(sizeInBytes === BigInt(file1.length() + file2.length()))
+      }
+    } finally {
+      hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, 
enableFallBackToHdfsForStats)
+      sql("DROP TABLE csv_table ")
+    }
+  }
+
   ignore("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
       hiveContext.sessionState.catalog.lookupRelation(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to