Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164680686 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala --- @@ -135,14 +142,21 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode) override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + private val messages = new ArrayBuffer[WriterCommitMessage]() + + override def add(message: WriterCommitMessage): Unit = synchronized { + messages += message + } + + override def commit(epochId: Long): Unit = synchronized { val newRows = messages.flatMap { case message: MemoryWriterCommitMessage => message.data - } + }.toArray sink.write(epochId, outputMode, newRows) + messages.clear() } - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + override def abort(epochId: Long): Unit = { --- End diff -- ditto
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org