Repository: spark Updated Branches: refs/heads/master 3ba69b648 -> 1b2785c3d
[SPARK-18729][SS] Move DataFrame.collect out of synchronized block in MemorySink ## What changes were proposed in this pull request? Move DataFrame.collect out of synchronized block so that we can query content in MemorySink when `DataFrame.collect` is running. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixi...@databricks.com> Closes #16162 from zsxwing/SPARK-18729. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b2785c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b2785c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b2785c3 Branch: refs/heads/master Commit: 1b2785c3d0a40da2fca923af78066060dbfbcf0a Parents: 3ba69b6 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Mon Dec 5 18:15:55 2016 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Dec 5 18:15:55 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/memory.scala | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1b2785c3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index adf6963..b370845 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -186,16 +186,23 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi }.mkString("\n") } - override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { - if (latestBatchId.isEmpty || batchId > latestBatchId.get) { + override def addBatch(batchId: Long, data: DataFrame): Unit = { + val notCommitted = synchronized { + latestBatchId.isEmpty || batchId > latestBatchId.get + } + if (notCommitted) { logDebug(s"Committing batch $batchId to $this") outputMode match { case InternalOutputModes.Append | InternalOutputModes.Update => - batches.append(AddedData(batchId, data.collect())) + val rows = AddedData(batchId, data.collect()) + synchronized { batches += rows } case InternalOutputModes.Complete => - batches.clear() - batches += AddedData(batchId, data.collect()) + val rows = AddedData(batchId, data.collect()) + synchronized { + batches.clear() + batches += rows + } case _ => throw new IllegalArgumentException( @@ -206,7 +213,7 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi } } - def clear(): Unit = { + def clear(): Unit = synchronized { batches.clear() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org