Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195797571
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
    @@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: 
Array[UnsafeRow])
     }
     
     /** A common trait for MemorySinks with methods used for testing */
    -trait MemorySinkBase extends BaseStreamingSink {
    +trait MemorySinkBase extends BaseStreamingSink with Logging {
       def allData: Seq[Row]
       def latestBatchData: Seq[Row]
       def dataSinceBatch(sinceBatchId: Long): Seq[Row]
       def latestBatchId: Option[Long]
    +
    +  /**
    +   * Truncates the given rows to return at most maxRows rows.
    +   * @param rows The data that may need to be truncated.
    +   * @param batchLimit Number of rows to keep in this batch; the rest will 
be truncated
    +   * @param sinkLimit Total number of rows kept in this sink, for logging 
purposes.
    +   * @param batchId The ID of the batch that sent these rows, for logging 
purposes.
    +   * @return Truncated rows.
    +   */
    +  protected def truncateRowsIfNeeded(
    +      rows: Array[Row],
    +      batchLimit: Int,
    +      sinkLimit: Int,
    +      batchId: Long): Array[Row] = {
    +    if (rows.length > batchLimit && batchLimit >= 0) {
    +      logWarning(s"Truncating batch $batchId to $batchLimit rows because 
of sink limit $sinkLimit")
    +      rows.take(batchLimit)
    +    } else {
    +      rows
    +    }
    +  }
    +}
    +
    +/**
    + * Companion object to MemorySinkBase.
    + */
    +object MemorySinkBase {
    +  val MAX_MEMORY_SINK_ROWS = "maxRows"
    +  val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
    +
    +  /**
    +   * Gets the max number of rows a MemorySink should store. This number is 
based on the memory
    +   * sink row limit if it is set. If not, there is no limit.
    +   * @param options Options for writing from which we get the max rows 
option
    +   * @return The maximum number of rows a memorySink should store, or None 
for no limit.
    --- End diff --
    
    need to update docs


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to