Repository: spark Updated Branches: refs/heads/branch-2.0 ec2fe925c -> c94288b57
[SPARK-18857][SQL] Don't use `Iterator.duplicate` for `incrementalCollect` in Thrift Server ## What changes were proposed in this pull request? To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However, Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**, this causes GC and hangs for queries with large number of rows. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <dongj...@apache.org> Closes #16440 from dongjoon-hyun/SPARK-18857. (cherry picked from commit a2c6adcc5d2702d2f0e9b239517353335e5f911e) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c94288b5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c94288b5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c94288b5 Branch: refs/heads/branch-2.0 Commit: c94288b57b5ce2232e58e35cada558d8d5b8ec6e Parents: ec2fe92 Author: Dongjoon Hyun <dongj...@apache.org> Authored: Tue Jan 10 13:27:55 2017 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Thu Jan 12 10:45:26 2017 +0000 ---------------------------------------------------------------------- .../org/apache/spark/sql/internal/SQLConf.scala | 7 +++++ .../SparkExecuteStatementOperation.scala | 30 +++++++++++++------- 2 files changed, 26 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c94288b5/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7598d47..edec6f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -295,6 +295,13 @@ object SQLConf { .stringConf .createOptional + val THRIFTSERVER_INCREMENTAL_COLLECT = + SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect") + .internal() + .doc("When true, enable incremental collection for execution in Thrift Server.") + .booleanConf + .createWithDefault(false) + val THRIFTSERVER_UI_STATEMENT_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") http://git-wip-us.apache.org/repos/asf/spark/blob/c94288b5/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 8a78523..a95170e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation( with Logging { private var result: DataFrame = _ + + // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST. + // This is only used when `spark.sql.thriftServer.incrementalCollect` is set to `false`. + // In case of `true`, this will be `None` and FETCH_FIRST will trigger re-execution. + private var resultList: Option[Array[SparkRow]] = _ + private var iter: Iterator[SparkRow] = _ - private var iterHeader: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ private var statementId: String = _ @@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation( // Reset iter to header when fetching start from first row if (order.equals(FetchOrientation.FETCH_FIRST)) { - val (ita, itb) = iterHeader.duplicate - iter = ita - iterHeader = itb + iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { + resultList = None + result.toLocalIterator.asScala + } else { + if (resultList.isEmpty) { + resultList = Some(result.collect()) + } + resultList.get.iterator + } } if (!iter.hasNext) { @@ -226,17 +237,14 @@ private[hive] class SparkExecuteStatementOperation( } HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) iter = { - val useIncrementalCollect = - sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean - if (useIncrementalCollect) { + if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { + resultList = None result.toLocalIterator.asScala } else { - result.collect().iterator + resultList = Some(result.collect()) + resultList.get.iterator } } - val (itra, itrb) = iter.duplicate - iterHeader = itra - iter = itrb dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { case e: HiveSQLException => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org