Repository: spark
Updated Branches:
  refs/heads/branch-2.1 0b07634b5 -> 9b9867ef5


[SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in 
Thrift Server

## What changes were proposed in this pull request?

To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However,
Scala `Iterator.duplicate` uses a **queue to buffer all items between both 
iterators**,
this causes GC and hangs for queries with large number of rows. We should not 
use this,
especially for `spark.sql.thriftServer.incrementalCollect`.

https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #16440 from dongjoon-hyun/SPARK-18857.

(cherry picked from commit a2c6adcc5d2702d2f0e9b239517353335e5f911e)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b9867ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b9867ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b9867ef

Branch: refs/heads/branch-2.1
Commit: 9b9867ef5b64b05f1e968de1fc0bfc1fcc64a707
Parents: 0b07634
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Tue Jan 10 13:27:55 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jan 12 10:45:10 2017 +0000

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  7 +++++
 .../SparkExecuteStatementOperation.scala        | 30 +++++++++++++-------
 2 files changed, 26 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9b9867ef/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8fbad60..8d493e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -309,6 +309,13 @@ object SQLConf {
     .stringConf
     .createOptional
 
+  val THRIFTSERVER_INCREMENTAL_COLLECT =
+    SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect")
+      .internal()
+      .doc("When true, enable incremental collection for execution in Thrift 
Server.")
+      .booleanConf
+      .createWithDefault(false)
+
   val THRIFTSERVER_UI_STATEMENT_LIMIT =
     SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
       .doc("The number of SQL statements kept in the JDBC/ODBC web UI 
history.")

http://git-wip-us.apache.org/repos/asf/spark/blob/9b9867ef/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index aeabd6a..517b01f 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation(
   with Logging {
 
   private var result: DataFrame = _
+
+  // We cache the returned rows to get iterators again in case the user wants 
to use FETCH_FIRST.
+  // This is only used when `spark.sql.thriftServer.incrementalCollect` is set 
to `false`.
+  // In case of `true`, this will be `None` and FETCH_FIRST will trigger 
re-execution.
+  private var resultList: Option[Array[SparkRow]] = _
+
   private var iter: Iterator[SparkRow] = _
-  private var iterHeader: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
   private var statementId: String = _
 
@@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation(
 
     // Reset iter to header when fetching start from first row
     if (order.equals(FetchOrientation.FETCH_FIRST)) {
-      val (ita, itb) = iterHeader.duplicate
-      iter = ita
-      iterHeader = itb
+      iter = if 
(sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
+        resultList = None
+        result.toLocalIterator.asScala
+      } else {
+        if (resultList.isEmpty) {
+          resultList = Some(result.collect())
+        }
+        resultList.get.iterator
+      }
     }
 
     if (!iter.hasNext) {
@@ -227,17 +238,14 @@ private[hive] class SparkExecuteStatementOperation(
       }
       HiveThriftServer2.listener.onStatementParsed(statementId, 
result.queryExecution.toString())
       iter = {
-        val useIncrementalCollect =
-          sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", 
"false").toBoolean
-        if (useIncrementalCollect) {
+        if 
(sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
+          resultList = None
           result.toLocalIterator.asScala
         } else {
-          result.collect().iterator
+          resultList = Some(result.collect())
+          resultList.get.iterator
         }
       }
-      val (itra, itrb) = iter.duplicate
-      iterHeader = itra
-      iter = itrb
       dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
     } catch {
       case e: HiveSQLException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to