Repository: spark
Updated Branches:
  refs/heads/master d9d146500 -> dd85eb544


[SPARK-18107][SQL] Insert overwrite statement runs much slower in spark-sql 
than it does in hive-client

## What changes were proposed in this pull request?

As reported on the jira, insert overwrite statement runs much slower in Spark, 
compared with hive-client.

It seems there is a patch 
[HIVE-11940](https://github.com/apache/hive/commit/ba21806b77287e237e1aa68fa169d2a81e07346d)
 which largely improves insert overwrite performance on Hive. HIVE-11940 is 
patched after Hive 2.0.0.

Because Spark SQL uses older Hive library, we can not benefit from such 
improvement.

The reporter verified that there is also a big performance gap between Hive 
1.2.1 (520.037 secs) and Hive 2.0.1 (35.975 secs) on insert overwrite execution.

Instead of upgrading to Hive 2.0 in Spark SQL, which might not be a trivial 
task, this patch provides an approach to delete the partition before asking 
Hive to load data files into the partition.

Note: The case reported on the jira is insert overwrite to partition. Since 
`Hive.loadTable` also uses the function to replace files, insert overwrite to 
table should has the same issue. We can take the same approach to delete the 
table first. I will upgrade this to include this.
## How was this patch tested?

Jenkins tests.

There are existing tests using insert overwrite statement. Those tests should 
be passed. I added a new test to specially test insert overwrite into partition.

For performance issue, as I don't have Hive 2.0 environment, this needs the 
reporter to verify it. Please refer to the jira.

Please review 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before 
opening a pull request.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #15667 from viirya/improve-hive-insertoverwrite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd85eb54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd85eb54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd85eb54

Branch: refs/heads/master
Commit: dd85eb5448c8f2672260b57e94c0da0eaac12616
Parents: d9d1465
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Tue Nov 1 00:24:08 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Nov 1 00:24:08 2016 -0700

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    | 24 +++++++++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 33 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd85eb54/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index c3c4e29..2843100 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
AlterTableDropPartitionCommand}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.SparkException
@@ -257,7 +258,28 @@ case class InsertIntoHiveTable(
             table.catalogTable.identifier.table,
             partitionSpec)
 
+        var doHiveOverwrite = overwrite
+
         if (oldPart.isEmpty || !ifNotExists) {
+          // SPARK-18107: Insert overwrite runs much slower than hive-client.
+          // Newer Hive largely improves insert overwrite performance. As 
Spark uses older Hive
+          // version and we may not want to catch up new Hive version every 
time. We delete the
+          // Hive partition first and then load data file into the Hive 
partition.
+          if (oldPart.nonEmpty && overwrite) {
+            oldPart.get.storage.locationUri.map { uri =>
+              val partitionPath = new Path(uri)
+              val fs = partitionPath.getFileSystem(hadoopConf)
+              if (fs.exists(partitionPath)) {
+                if (!fs.delete(partitionPath, true)) {
+                  throw new RuntimeException(
+                    "Cannot remove partition directory '" + 
partitionPath.toString)
+                }
+                // Don't let Hive do overwrite operation since it is slower.
+                doHiveOverwrite = false
+              }
+            }
+          }
+
           // inheritTableSpecs is set to true. It should be set to false for 
an IMPORT query
           // which is currently considered as a Hive native command.
           val inheritTableSpecs = true
@@ -266,7 +288,7 @@ case class InsertIntoHiveTable(
             table.catalogTable.identifier.table,
             outputPath.toString,
             partitionSpec,
-            isOverwrite = overwrite,
+            isOverwrite = doHiveOverwrite,
             holdDDLTime = holdDDLTime,
             inheritTableSpecs = inheritTableSpecs)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/dd85eb54/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f64010a..8b91693 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1973,6 +1973,39 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
     }
   }
 
+  test("Insert overwrite with partition") {
+    withTable("tableWithPartition") {
+      sql(
+        """
+          |CREATE TABLE tableWithPartition (key int, value STRING)
+          |PARTITIONED BY (part STRING)
+        """.stripMargin)
+      sql(
+        """
+          |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1')
+          |SELECT * FROM default.src
+        """.stripMargin)
+       checkAnswer(
+         sql("SELECT part, key, value FROM tableWithPartition"),
+         sql("SELECT '1' AS part, key, value FROM default.src")
+       )
+
+      sql(
+        """
+          |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1')
+          |SELECT * FROM VALUES (1, "one"), (2, "two"), (3, null) AS data(key, 
value)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT part, key, value FROM tableWithPartition"),
+        sql(
+          """
+            |SELECT '1' AS part, key, value FROM VALUES
+            |(1, "one"), (2, "two"), (3, null) AS data(key, value)
+          """.stripMargin)
+      )
+    }
+  }
+
   def testCommandAvailable(command: String): Boolean = {
     val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
     attempt.isSuccess && attempt.get == 0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to