Repository: spark
Updated Branches:
  refs/heads/master 6e6298154 -> b929537b6


[SPARK-18182] Expose ReplayListenerBus.read() overload which takes string 
iterator

The `ReplayListenerBus.read()` method is used when implementing a custom 
`ApplicationHistoryProvider`. The current interface only exposes a `read()` 
method which takes an `InputStream` and performs stream-to-lines conversion 
itself, but it would also be useful to expose an overloaded method which 
accepts an iterator of strings, thereby enabling events to be provided from 
non-`InputStream` sources.

Author: Josh Rosen <joshro...@databricks.com>

Closes #15698 from JoshRosen/replay-listener-bus-interface.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b929537b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b929537b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b929537b

Branch: refs/heads/master
Commit: b929537b6eb0f8f34497c3dbceea8045bf5dffdb
Parents: 6e62981
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Nov 1 16:49:41 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Nov 1 16:49:41 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/ReplayListenerBus.scala   | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b929537b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 2424586..0bd5a6b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -53,13 +53,24 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
       sourceName: String,
       maybeTruncated: Boolean = false,
       eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+    val lines = Source.fromInputStream(logData).getLines()
+    replay(lines, sourceName, maybeTruncated, eventsFilter)
+  }
 
+  /**
+   * Overloaded variant of [[replay()]] which accepts an iterator of lines 
instead of an
+   * [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider 
implementations.
+   */
+  def replay(
+      lines: Iterator[String],
+      sourceName: String,
+      maybeTruncated: Boolean,
+      eventsFilter: ReplayEventsFilter): Unit = {
     var currentLine: String = null
     var lineNumber: Int = 0
 
     try {
-      val lineEntries = Source.fromInputStream(logData)
-        .getLines()
+      val lineEntries = lines
         .zipWithIndex
         .filter { case (line, _) => eventsFilter(line) }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to