Repository: spark Updated Branches: refs/heads/master 6e6298154 -> b929537b6
[SPARK-18182] Expose ReplayListenerBus.read() overload which takes string iterator The `ReplayListenerBus.read()` method is used when implementing a custom `ApplicationHistoryProvider`. The current interface only exposes a `read()` method which takes an `InputStream` and performs stream-to-lines conversion itself, but it would also be useful to expose an overloaded method which accepts an iterator of strings, thereby enabling events to be provided from non-`InputStream` sources. Author: Josh Rosen <joshro...@databricks.com> Closes #15698 from JoshRosen/replay-listener-bus-interface. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b929537b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b929537b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b929537b Branch: refs/heads/master Commit: b929537b6eb0f8f34497c3dbceea8045bf5dffdb Parents: 6e62981 Author: Josh Rosen <joshro...@databricks.com> Authored: Tue Nov 1 16:49:41 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Nov 1 16:49:41 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/ReplayListenerBus.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b929537b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 2424586..0bd5a6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -53,13 +53,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { sourceName: String, maybeTruncated: Boolean = false, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { + val lines = Source.fromInputStream(logData).getLines() + replay(lines, sourceName, maybeTruncated, eventsFilter) + } + /** + * Overloaded variant of [[replay()]] which accepts an iterator of lines instead of an + * [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider implementations. + */ + def replay( + lines: Iterator[String], + sourceName: String, + maybeTruncated: Boolean, + eventsFilter: ReplayEventsFilter): Unit = { var currentLine: String = null var lineNumber: Int = 0 try { - val lineEntries = Source.fromInputStream(logData) - .getLines() + val lineEntries = lines .zipWithIndex .filter { case (line, _) => eventsFilter(line) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org