Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164680538
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 ---
    @@ -39,13 +41,20 @@ class ConsoleWriter(schema: StructType, options: 
DataSourceV2Options)
     
       def createWriterFactory(): DataWriterFactory[Row] = 
PackedRowWriterFactory
     
    -  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {
    +  private val messages = new ArrayBuffer[WriterCommitMessage]()
    +
    +  override def add(message: WriterCommitMessage): Unit = synchronized {
    +    messages += message
    +  }
    +
    +  override def commit(epochId: Long): Unit = synchronized {
         // We have to print a "Batch" label for the epoch for compatibility 
with the pre-data source V2
         // behavior.
    -    printRows(messages, schema, s"Batch: $epochId")
    +    printRows(messages.toArray, schema, s"Batch: $epochId")
    +    messages.clear()
       }
     
    -  def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +  def abort(epochId: Long): Unit = {}
    --- End diff --
    
    we should clear the message array in abort too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to