Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164680538 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala --- @@ -39,13 +41,20 @@ class ConsoleWriter(schema: StructType, options: DataSourceV2Options) def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + private val messages = new ArrayBuffer[WriterCommitMessage]() + + override def add(message: WriterCommitMessage): Unit = synchronized { + messages += message + } + + override def commit(epochId: Long): Unit = synchronized { // We have to print a "Batch" label for the epoch for compatibility with the pre-data source V2 // behavior. - printRows(messages, schema, s"Batch: $epochId") + printRows(messages.toArray, schema, s"Batch: $epochId") + messages.clear() } - def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + def abort(epochId: Long): Unit = {} --- End diff -- we should clear the message array in abort too.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org