Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195809395
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
    @@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: 
Array[UnsafeRow])
     }
     
     /** A common trait for MemorySinks with methods used for testing */
    -trait MemorySinkBase extends BaseStreamingSink {
    +trait MemorySinkBase extends BaseStreamingSink with Logging {
       def allData: Seq[Row]
       def latestBatchData: Seq[Row]
       def dataSinceBatch(sinceBatchId: Long): Seq[Row]
       def latestBatchId: Option[Long]
    +
    +  /**
    +   * Truncates the given rows to return at most maxRows rows.
    +   * @param rows The data that may need to be truncated.
    +   * @param batchLimit Number of rows to keep in this batch; the rest will 
be truncated
    +   * @param sinkLimit Total number of rows kept in this sink, for logging 
purposes.
    +   * @param batchId The ID of the batch that sent these rows, for logging 
purposes.
    +   * @return Truncated rows.
    +   */
    +  protected def truncateRowsIfNeeded(
    +      rows: Array[Row],
    +      batchLimit: Int,
    +      sinkLimit: Int,
    +      batchId: Long): Array[Row] = {
    +    if (rows.length > batchLimit && batchLimit >= 0) {
    +      logWarning(s"Truncating batch $batchId to $batchLimit rows because 
of sink limit $sinkLimit")
    --- End diff --
    
    This piece is shared by MemorySink and MemorySinkV2, and the MemorySinkV2 
(continuous processing) sink still calls them batches.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to