Repository: spark
Updated Branches:
  refs/heads/branch-1.6 dccc4645d -> ab7da0eae


[SPARK-11462][STREAMING] Add JavaStreamingListener

Currently, StreamingListener is not Java friendly because it exposes some Scala 
collections to Java users directly, such as Option, Map.

This PR added a Java version of StreamingListener and a bunch of Java friendly 
classes for Java users.

Author: zsxwing <zsxw...@gmail.com>
Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9420 from zsxwing/java-streaming-listener.

(cherry picked from commit 1f0f14efe35f986e338ee2cbc1ef2a9ce7395c00)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab7da0ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab7da0ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab7da0ea

Branch: refs/heads/branch-1.6
Commit: ab7da0eae4ed9ae23e5fd6623d1fb4dcc1979976
Parents: dccc464
Author: zsxwing <zsxw...@gmail.com>
Authored: Mon Nov 9 17:38:19 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Nov 9 17:38:37 2015 -0800

----------------------------------------------------------------------
 .../api/java/JavaStreamingListener.scala        | 168 +++++++++++
 .../api/java/JavaStreamingListenerWrapper.scala | 122 ++++++++
 .../JavaStreamingListenerAPISuite.java          |  85 ++++++
 .../JavaStreamingListenerWrapperSuite.scala     | 290 +++++++++++++++++++
 4 files changed, 665 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab7da0ea/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
new file mode 100644
index 0000000..c86c710
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import org.apache.spark.streaming.Time
+
+/**
+ * A listener interface for receiving information about an ongoing streaming  
computation.
+ */
+private[streaming] class JavaStreamingListener {
+
+  /** Called when a receiver has been started */
+  def onReceiverStarted(receiverStarted: 
JavaStreamingListenerReceiverStarted): Unit = { }
+
+  /** Called when a receiver has reported an error */
+  def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit 
= { }
+
+  /** Called when a receiver has been stopped */
+  def onReceiverStopped(receiverStopped: 
JavaStreamingListenerReceiverStopped): Unit = { }
+
+  /** Called when a batch of jobs has been submitted for processing. */
+  def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): 
Unit = { }
+
+  /** Called when processing of a batch of jobs has started.  */
+  def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = 
{ }
+
+  /** Called when processing of a batch of jobs has completed. */
+  def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): 
Unit = { }
+
+  /** Called when processing of a job of a batch has started. */
+  def onOutputOperationStarted(
+      outputOperationStarted: JavaStreamingListenerOutputOperationStarted): 
Unit = { }
+
+  /** Called when processing of a job of a batch has completed. */
+  def onOutputOperationCompleted(
+      outputOperationCompleted: 
JavaStreamingListenerOutputOperationCompleted): Unit = { }
+}
+
+/**
+ * Base trait for events related to JavaStreamingListener
+ */
+private[streaming] sealed trait JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: 
JavaBatchInfo)
+  extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerBatchCompleted(val batchInfo: 
JavaBatchInfo)
+  extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerBatchStarted(val batchInfo: 
JavaBatchInfo)
+  extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerOutputOperationStarted(
+    val outputOperationInfo: JavaOutputOperationInfo) extends 
JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerOutputOperationCompleted(
+    val outputOperationInfo: JavaOutputOperationInfo) extends 
JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerReceiverStarted(val 
receiverInfo: JavaReceiverInfo)
+  extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerReceiverError(val receiverInfo: 
JavaReceiverInfo)
+  extends JavaStreamingListenerEvent
+
+private[streaming] class JavaStreamingListenerReceiverStopped(val 
receiverInfo: JavaReceiverInfo)
+  extends JavaStreamingListenerEvent
+
+/**
+ * Class having information on batches.
+ *
+ * @param batchTime Time of the batch
+ * @param streamIdToInputInfo A map of input stream id to its input info
+ * @param submissionTime Clock time of when jobs of this batch was submitted 
to the streaming
+ *                       scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch 
started processing.
+ *                            `-1` means the batch has not yet started
+ * @param processingEndTime Clock time of when the last job of this batch 
finished processing. `-1`
+ *                          means the batch has not yet completed.
+ * @param schedulingDelay Time taken for the first job of this batch to start 
processing from the
+ *                        time this batch was submitted to the streaming 
scheduler. Essentially, it
+ *                        is `processingStartTime` - `submissionTime`. `-1` 
means the batch has not
+ *                        yet started
+ * @param processingDelay Time taken for the all jobs of this batch to finish 
processing from the
+ *                        time they started processing. Essentially, it is
+ *                        `processingEndTime` - `processingStartTime`. `-1` 
means the batch has not
+ *                        yet completed.
+ * @param totalDelay Time taken for all the jobs of this batch to finish 
processing from the time
+ *                   they were submitted.  Essentially, it is 
`processingDelay` + `schedulingDelay`.
+ *                   `-1` means the batch has not yet completed.
+ * @param numRecords The number of recorders received by the receivers in this 
batch
+ * @param outputOperationInfos The output operations in this batch
+ */
+private[streaming] case class JavaBatchInfo(
+    batchTime: Time,
+    streamIdToInputInfo: java.util.Map[Int, JavaStreamInputInfo],
+    submissionTime: Long,
+    processingStartTime: Long,
+    processingEndTime: Long,
+    schedulingDelay: Long,
+    processingDelay: Long,
+    totalDelay: Long,
+    numRecords: Long,
+    outputOperationInfos: java.util.Map[Int, JavaOutputOperationInfo])
+
+/**
+ * Track the information of input stream at specified batch time.
+ *
+ * @param inputStreamId the input stream id
+ * @param numRecords the number of records in a batch
+ * @param metadata metadata for this batch. It should contain at least one 
standard field named
+ *                 "Description" which maps to the content that will be shown 
in the UI.
+ * @param metadataDescription description of this input stream
+ */
+private[streaming] case class JavaStreamInputInfo(
+    inputStreamId: Int,
+    numRecords: Long,
+    metadata: java.util.Map[String, Any],
+    metadataDescription: String)
+
+/**
+ * Class having information about a receiver
+ */
+private[streaming] case class JavaReceiverInfo(
+    streamId: Int,
+    name: String,
+    active: Boolean,
+    location: String,
+    lastErrorMessage: String,
+    lastError: String,
+    lastErrorTime: Long)
+
+/**
+ * Class having information on output operations.
+ *
+ * @param batchTime Time of the batch
+ * @param id Id of this output operation. Different output operations have 
different ids in a batch.
+ * @param name The name of this output operation.
+ * @param description The description of this output operation.
+ * @param startTime Clock time of when the output operation started 
processing. `-1` means the
+ *                  output operation has not yet started
+ * @param endTime Clock time of when the output operation started processing. 
`-1` means the output
+ *                operation has not yet completed
+ * @param failureReason Failure reason if this output operation fails. If the 
output operation is
+ *                      successful, this field is `null`.
+ */
+private[streaming] case class JavaOutputOperationInfo(
+    batchTime: Time,
+    id: Int,
+    name: String,
+    description: String,
+    startTime: Long,
+    endTime: Long,
+    failureReason: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/ab7da0ea/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
new file mode 100644
index 0000000..2c60b39
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.streaming.scheduler._
+
+/**
+ * A wrapper to convert a [[JavaStreamingListener]] to a [[StreamingListener]].
+ */
+private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: 
JavaStreamingListener)
+  extends StreamingListener {
+
+  private def toJavaReceiverInfo(receiverInfo: ReceiverInfo): JavaReceiverInfo 
= {
+    JavaReceiverInfo(
+      receiverInfo.streamId,
+      receiverInfo.name,
+      receiverInfo.active,
+      receiverInfo.location,
+      receiverInfo.lastErrorMessage,
+      receiverInfo.lastError,
+      receiverInfo.lastErrorTime
+    )
+  }
+
+  private def toJavaStreamInputInfo(streamInputInfo: StreamInputInfo): 
JavaStreamInputInfo = {
+    JavaStreamInputInfo(
+      streamInputInfo.inputStreamId,
+      streamInputInfo.numRecords: Long,
+      streamInputInfo.metadata.asJava,
+      streamInputInfo.metadataDescription.orNull
+    )
+  }
+
+  private def toJavaOutputOperationInfo(
+      outputOperationInfo: OutputOperationInfo): JavaOutputOperationInfo = {
+    JavaOutputOperationInfo(
+      outputOperationInfo.batchTime,
+      outputOperationInfo.id,
+      outputOperationInfo.name,
+      outputOperationInfo.description: String,
+      outputOperationInfo.startTime.getOrElse(-1),
+      outputOperationInfo.endTime.getOrElse(-1),
+      outputOperationInfo.failureReason.orNull
+    )
+  }
+
+  private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = {
+    JavaBatchInfo(
+      batchInfo.batchTime,
+      batchInfo.streamIdToInputInfo.mapValues(toJavaStreamInputInfo(_)).asJava,
+      batchInfo.submissionTime,
+      batchInfo.processingStartTime.getOrElse(-1),
+      batchInfo.processingEndTime.getOrElse(-1),
+      batchInfo.schedulingDelay.getOrElse(-1),
+      batchInfo.processingDelay.getOrElse(-1),
+      batchInfo.totalDelay.getOrElse(-1),
+      batchInfo.numRecords,
+      
batchInfo.outputOperationInfos.mapValues(toJavaOutputOperationInfo(_)).asJava
+    )
+  }
+
+  override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
+    javaStreamingListener.onReceiverStarted(
+      new 
JavaStreamingListenerReceiverStarted(toJavaReceiverInfo(receiverStarted.receiverInfo)))
+  }
+
+  override def onReceiverError(receiverError: StreamingListenerReceiverError): 
Unit = {
+    javaStreamingListener.onReceiverError(
+      new 
JavaStreamingListenerReceiverError(toJavaReceiverInfo(receiverError.receiverInfo)))
+  }
+
+  override def onReceiverStopped(receiverStopped: 
StreamingListenerReceiverStopped): Unit = {
+    javaStreamingListener.onReceiverStopped(
+      new 
JavaStreamingListenerReceiverStopped(toJavaReceiverInfo(receiverStopped.receiverInfo)))
+  }
+
+  override def onBatchSubmitted(batchSubmitted: 
StreamingListenerBatchSubmitted): Unit = {
+    javaStreamingListener.onBatchSubmitted(
+      new 
JavaStreamingListenerBatchSubmitted(toJavaBatchInfo(batchSubmitted.batchInfo)))
+  }
+
+  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): 
Unit = {
+    javaStreamingListener.onBatchStarted(
+      new 
JavaStreamingListenerBatchStarted(toJavaBatchInfo(batchStarted.batchInfo)))
+  }
+
+  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
+    javaStreamingListener.onBatchCompleted(
+      new 
JavaStreamingListenerBatchCompleted(toJavaBatchInfo(batchCompleted.batchInfo)))
+  }
+
+  override def onOutputOperationStarted(
+      outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = 
{
+    javaStreamingListener.onOutputOperationStarted(new 
JavaStreamingListenerOutputOperationStarted(
+      toJavaOutputOperationInfo(outputOperationStarted.outputOperationInfo)))
+  }
+
+  override def onOutputOperationCompleted(
+      outputOperationCompleted: StreamingListenerOutputOperationCompleted): 
Unit = {
+    javaStreamingListener.onOutputOperationCompleted(
+      new JavaStreamingListenerOutputOperationCompleted(
+        
toJavaOutputOperationInfo(outputOperationCompleted.outputOperationInfo)))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab7da0ea/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
new file mode 100644
index 0000000..8cc285a
--- /dev/null
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.*;
+
+public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
+
+  @Override
+  public void onReceiverStarted(JavaStreamingListenerReceiverStarted 
receiverStarted) {
+    JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo();
+    receiverInfo.streamId();
+    receiverInfo.name();
+    receiverInfo.active();
+    receiverInfo.location();
+    receiverInfo.lastErrorMessage();
+    receiverInfo.lastError();
+    receiverInfo.lastErrorTime();
+  }
+
+  @Override
+  public void onReceiverError(JavaStreamingListenerReceiverError 
receiverError) {
+    JavaReceiverInfo receiverInfo = receiverError.receiverInfo();
+    receiverInfo.streamId();
+    receiverInfo.name();
+    receiverInfo.active();
+    receiverInfo.location();
+    receiverInfo.lastErrorMessage();
+    receiverInfo.lastError();
+    receiverInfo.lastErrorTime();
+  }
+
+  @Override
+  public void onReceiverStopped(JavaStreamingListenerReceiverStopped 
receiverStopped) {
+    JavaReceiverInfo receiverInfo = receiverStopped.receiverInfo();
+    receiverInfo.streamId();
+    receiverInfo.name();
+    receiverInfo.active();
+    receiverInfo.location();
+    receiverInfo.lastErrorMessage();
+    receiverInfo.lastError();
+    receiverInfo.lastErrorTime();
+  }
+
+  @Override
+  public void onBatchSubmitted(JavaStreamingListenerBatchSubmitted 
batchSubmitted) {
+    super.onBatchSubmitted(batchSubmitted);
+  }
+
+  @Override
+  public void onBatchStarted(JavaStreamingListenerBatchStarted batchStarted) {
+    super.onBatchStarted(batchStarted);
+  }
+
+  @Override
+  public void onBatchCompleted(JavaStreamingListenerBatchCompleted 
batchCompleted) {
+    super.onBatchCompleted(batchCompleted);
+  }
+
+  @Override
+  public void 
onOutputOperationStarted(JavaStreamingListenerOutputOperationStarted 
outputOperationStarted) {
+    super.onOutputOperationStarted(outputOperationStarted);
+  }
+
+  @Override
+  public void 
onOutputOperationCompleted(JavaStreamingListenerOutputOperationCompleted 
outputOperationCompleted) {
+    super.onOutputOperationCompleted(outputOperationCompleted);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ab7da0ea/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
 
b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
new file mode 100644
index 0000000..6d6d61e
--- /dev/null
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler._
+
+class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
+
+  test("basic") {
+    val listener = new TestJavaStreamingListener()
+    val listenerWrapper = new JavaStreamingListenerWrapper(listener)
+
+    val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
+      streamId = 2,
+      name = "test",
+      active = true,
+      location = "localhost"
+    ))
+    listenerWrapper.onReceiverStarted(receiverStarted)
+    assertReceiverInfo(listener.receiverStarted.receiverInfo, 
receiverStarted.receiverInfo)
+
+    val receiverStopped = StreamingListenerReceiverStopped(ReceiverInfo(
+      streamId = 2,
+      name = "test",
+      active = false,
+      location = "localhost"
+    ))
+    listenerWrapper.onReceiverStopped(receiverStopped)
+    assertReceiverInfo(listener.receiverStopped.receiverInfo, 
receiverStopped.receiverInfo)
+
+    val receiverError = StreamingListenerReceiverError(ReceiverInfo(
+      streamId = 2,
+      name = "test",
+      active = false,
+      location = "localhost",
+      lastErrorMessage = "failed",
+      lastError = "failed",
+      lastErrorTime = System.currentTimeMillis()
+    ))
+    listenerWrapper.onReceiverError(receiverError)
+    assertReceiverInfo(listener.receiverError.receiverInfo, 
receiverError.receiverInfo)
+
+    val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
+      batchTime = Time(1000L),
+      streamIdToInputInfo = Map(
+        0 -> StreamInputInfo(
+          inputStreamId = 0,
+          numRecords = 1000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> 
"receiver1")),
+        1 -> StreamInputInfo(
+          inputStreamId = 1,
+          numRecords = 2000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> 
"receiver2"))),
+      submissionTime = 1001L,
+      None,
+      None,
+      outputOperationInfos = Map(
+        0 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 0,
+          name = "op1",
+          description = "operation1",
+          startTime = None,
+          endTime = None,
+          failureReason = None),
+        1 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 1,
+          name = "op2",
+          description = "operation2",
+          startTime = None,
+          endTime = None,
+          failureReason = None))
+    ))
+    listenerWrapper.onBatchSubmitted(batchSubmitted)
+    assertBatchInfo(listener.batchSubmitted.batchInfo, 
batchSubmitted.batchInfo)
+
+    val batchStarted = StreamingListenerBatchStarted(BatchInfo(
+      batchTime = Time(1000L),
+      streamIdToInputInfo = Map(
+        0 -> StreamInputInfo(
+          inputStreamId = 0,
+          numRecords = 1000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> 
"receiver1")),
+        1 -> StreamInputInfo(
+          inputStreamId = 1,
+          numRecords = 2000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> 
"receiver2"))),
+      submissionTime = 1001L,
+      Some(1002L),
+      None,
+      outputOperationInfos = Map(
+        0 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 0,
+          name = "op1",
+          description = "operation1",
+          startTime = Some(1003L),
+          endTime = None,
+          failureReason = None),
+        1 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 1,
+          name = "op2",
+          description = "operation2",
+          startTime = Some(1005L),
+          endTime = None,
+          failureReason = None))
+    ))
+    listenerWrapper.onBatchStarted(batchStarted)
+    assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo)
+
+    val batchCompleted = StreamingListenerBatchCompleted(BatchInfo(
+      batchTime = Time(1000L),
+      streamIdToInputInfo = Map(
+        0 -> StreamInputInfo(
+          inputStreamId = 0,
+          numRecords = 1000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> 
"receiver1")),
+        1 -> StreamInputInfo(
+          inputStreamId = 1,
+          numRecords = 2000,
+          metadata = Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> 
"receiver2"))),
+      submissionTime = 1001L,
+      Some(1002L),
+      Some(1010L),
+      outputOperationInfos = Map(
+        0 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 0,
+          name = "op1",
+          description = "operation1",
+          startTime = Some(1003L),
+          endTime = Some(1004L),
+          failureReason = None),
+        1 -> OutputOperationInfo(
+          batchTime = Time(1000L),
+          id = 1,
+          name = "op2",
+          description = "operation2",
+          startTime = Some(1005L),
+          endTime = Some(1010L),
+          failureReason = None))
+    ))
+    listenerWrapper.onBatchCompleted(batchCompleted)
+    assertBatchInfo(listener.batchCompleted.batchInfo, 
batchCompleted.batchInfo)
+
+    val outputOperationStarted = 
StreamingListenerOutputOperationStarted(OutputOperationInfo(
+      batchTime = Time(1000L),
+      id = 0,
+      name = "op1",
+      description = "operation1",
+      startTime = Some(1003L),
+      endTime = None,
+      failureReason = None
+    ))
+    listenerWrapper.onOutputOperationStarted(outputOperationStarted)
+    
assertOutputOperationInfo(listener.outputOperationStarted.outputOperationInfo,
+      outputOperationStarted.outputOperationInfo)
+
+    val outputOperationCompleted = 
StreamingListenerOutputOperationCompleted(OutputOperationInfo(
+      batchTime = Time(1000L),
+      id = 0,
+      name = "op1",
+      description = "operation1",
+      startTime = Some(1003L),
+      endTime = Some(1004L),
+      failureReason = None
+    ))
+    listenerWrapper.onOutputOperationCompleted(outputOperationCompleted)
+    
assertOutputOperationInfo(listener.outputOperationCompleted.outputOperationInfo,
+      outputOperationCompleted.outputOperationInfo)
+  }
+
+  private def assertReceiverInfo(
+      javaReceiverInfo: JavaReceiverInfo, receiverInfo: ReceiverInfo): Unit = {
+    assert(javaReceiverInfo.streamId === receiverInfo.streamId)
+    assert(javaReceiverInfo.name === receiverInfo.name)
+    assert(javaReceiverInfo.active === receiverInfo.active)
+    assert(javaReceiverInfo.location === receiverInfo.location)
+    assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
+    assert(javaReceiverInfo.lastError === receiverInfo.lastError)
+    assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)
+  }
+
+  private def assertBatchInfo(javaBatchInfo: JavaBatchInfo, batchInfo: 
BatchInfo): Unit = {
+    assert(javaBatchInfo.batchTime === batchInfo.batchTime)
+    assert(javaBatchInfo.streamIdToInputInfo.size === 
batchInfo.streamIdToInputInfo.size)
+    batchInfo.streamIdToInputInfo.foreach { case (streamId, streamInputInfo) =>
+      assertStreamingInfo(javaBatchInfo.streamIdToInputInfo.get(streamId), 
streamInputInfo)
+    }
+    assert(javaBatchInfo.submissionTime === batchInfo.submissionTime)
+    assert(javaBatchInfo.processingStartTime === 
batchInfo.processingStartTime.getOrElse(-1))
+    assert(javaBatchInfo.processingEndTime === 
batchInfo.processingEndTime.getOrElse(-1))
+    assert(javaBatchInfo.schedulingDelay === 
batchInfo.schedulingDelay.getOrElse(-1))
+    assert(javaBatchInfo.processingDelay === 
batchInfo.processingDelay.getOrElse(-1))
+    assert(javaBatchInfo.totalDelay === batchInfo.totalDelay.getOrElse(-1))
+    assert(javaBatchInfo.numRecords === batchInfo.numRecords)
+    assert(javaBatchInfo.outputOperationInfos.size === 
batchInfo.outputOperationInfos.size)
+    batchInfo.outputOperationInfos.foreach { case (outputOperationId, 
outputOperationInfo) =>
+      assertOutputOperationInfo(
+        javaBatchInfo.outputOperationInfos.get(outputOperationId), 
outputOperationInfo)
+    }
+  }
+
+  private def assertStreamingInfo(
+      javaStreamInputInfo: JavaStreamInputInfo, streamInputInfo: 
StreamInputInfo): Unit = {
+    assert(javaStreamInputInfo.inputStreamId === streamInputInfo.inputStreamId)
+    assert(javaStreamInputInfo.numRecords === streamInputInfo.numRecords)
+    assert(javaStreamInputInfo.metadata === streamInputInfo.metadata.asJava)
+    assert(javaStreamInputInfo.metadataDescription === 
streamInputInfo.metadataDescription.orNull)
+  }
+
+  private def assertOutputOperationInfo(
+      javaOutputOperationInfo: JavaOutputOperationInfo,
+      outputOperationInfo: OutputOperationInfo): Unit = {
+    assert(javaOutputOperationInfo.batchTime === outputOperationInfo.batchTime)
+    assert(javaOutputOperationInfo.id === outputOperationInfo.id)
+    assert(javaOutputOperationInfo.name === outputOperationInfo.name)
+    assert(javaOutputOperationInfo.description === 
outputOperationInfo.description)
+    assert(javaOutputOperationInfo.startTime === 
outputOperationInfo.startTime.getOrElse(-1))
+    assert(javaOutputOperationInfo.endTime === 
outputOperationInfo.endTime.getOrElse(-1))
+    assert(javaOutputOperationInfo.failureReason === 
outputOperationInfo.failureReason.orNull)
+  }
+}
+
+class TestJavaStreamingListener extends JavaStreamingListener {
+
+  var receiverStarted: JavaStreamingListenerReceiverStarted = null
+  var receiverError: JavaStreamingListenerReceiverError = null
+  var receiverStopped: JavaStreamingListenerReceiverStopped = null
+  var batchSubmitted: JavaStreamingListenerBatchSubmitted = null
+  var batchStarted: JavaStreamingListenerBatchStarted = null
+  var batchCompleted: JavaStreamingListenerBatchCompleted = null
+  var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = 
null
+  var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted 
= null
+
+  override def onReceiverStarted(receiverStarted: 
JavaStreamingListenerReceiverStarted): Unit = {
+    this.receiverStarted = receiverStarted
+  }
+
+  override def onReceiverError(receiverError: 
JavaStreamingListenerReceiverError): Unit = {
+    this.receiverError = receiverError
+  }
+
+  override def onReceiverStopped(receiverStopped: 
JavaStreamingListenerReceiverStopped): Unit = {
+    this.receiverStopped = receiverStopped
+  }
+
+  override def onBatchSubmitted(batchSubmitted: 
JavaStreamingListenerBatchSubmitted): Unit = {
+    this.batchSubmitted = batchSubmitted
+  }
+
+  override def onBatchStarted(batchStarted: 
JavaStreamingListenerBatchStarted): Unit = {
+    this.batchStarted = batchStarted
+  }
+
+  override def onBatchCompleted(batchCompleted: 
JavaStreamingListenerBatchCompleted): Unit = {
+    this.batchCompleted = batchCompleted
+  }
+
+  override def onOutputOperationStarted(
+      outputOperationStarted: JavaStreamingListenerOutputOperationStarted): 
Unit = {
+    this.outputOperationStarted = outputOperationStarted
+  }
+
+  override def onOutputOperationCompleted(
+      outputOperationCompleted: 
JavaStreamingListenerOutputOperationCompleted): Unit = {
+    this.outputOperationCompleted = outputOperationCompleted
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to