Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195268299 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink { def latestBatchId: Option[Long] } +/** + * Companion object to MemorySinkBase. + */ +object MemorySinkBase { + val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows" + val MAX_MEMORY_SINK_ROWS_DEFAULT = -1 + + /** + * Gets the max number of rows a MemorySink should store. This number is based on the memory + * sink row limit if it is set. If not, there is no limit. + * @param options Options for writing from which we get the max rows option + * @return The maximum number of rows a memorySink should store, or None for no limit. + */ + def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = { + val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT) + if (maxRows >= 0) Some(maxRows) else None + } +} + + --- End diff -- nit: remove extra line
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org