Repository: spark
Updated Branches:
  refs/heads/branch-2.2 38edb9256 -> 6f0d29672


[SPARK-20464][SS] Add a job group and description for streaming queries and fix 
cancellation of running jobs using the job group

## What changes were proposed in this pull request?

Job group: adding a job group is required to properly cancel running jobs 
related to a query.
Description: the new description makes it easier to group the batches of a 
query by sorting by name in the Spark Jobs UI.

## How was this patch tested?

- Unit tests
- UI screenshot

  - Order by job id:
![screen shot 2017-04-27 at 5 10 09 
pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png)

  - Order by description:
![screen shot 2017-04-27 at 5 10 22 
pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png)

  - Order by job id (no query name):
![screen shot 2017-04-27 at 5 21 33 
pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png)

  - Order by description (no query name):
![screen shot 2017-04-27 at 5 21 44 
pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png)

Author: Kunal Khamar <kkha...@outlook.com>

Closes #17765 from kunalkhamar/sc-6696.

(cherry picked from commit 6fc6cf88d871f5b05b0ad1a504e0d6213cf9d331)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f0d2967
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f0d2967
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f0d2967

Branch: refs/heads/branch-2.2
Commit: 6f0d29672512bcb720fb82bc92071207dfae5eb1
Parents: 38edb92
Author: Kunal Khamar <kkha...@outlook.com>
Authored: Mon May 1 11:37:30 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon May 1 11:37:44 2017 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/UIUtils.scala     |  2 +-
 .../execution/streaming/StreamExecution.scala   | 12 ++++
 .../spark/sql/streaming/StreamSuite.scala       | 66 ++++++++++++++++++++
 3 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f0d2967/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index e53d690..79b0d81 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
       val xml = XML.loadString(s"""<span 
class="description-input">$desc</span>""")
 
       // Verify that this has only anchors and span (we are wrapping in span)
-      val allowedNodeLabels = Set("a", "span")
+      val allowedNodeLabels = Set("a", "span", "br")
       val illegalNodes = xml \\ "_"  filterNot { case node: Node =>
         allowedNodeLabels.contains(node.label)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f0d2967/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index bcf0d97..affc201 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -252,6 +252,8 @@ class StreamExecution(
    */
   private def runBatches(): Unit = {
     try {
+      sparkSession.sparkContext.setJobGroup(runId.toString, 
getBatchDescriptionString,
+        interruptOnCancel = true)
       if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
         
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
@@ -289,6 +291,7 @@ class StreamExecution(
               if (currentBatchId < 0) {
                 // We'll do this initialization only once
                 populateStartOffsets(sparkSessionToRunBatches)
+                
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
                 logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
               } else {
                 constructNextBatch()
@@ -308,6 +311,7 @@ class StreamExecution(
               logDebug(s"batch ${currentBatchId} committed")
               // We'll increase currentBatchId after we complete processing 
current batch's data
               currentBatchId += 1
+              
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
             } else {
               currentStatus = currentStatus.copy(isDataAvailable = false)
               updateStatusMessage("Waiting for data to arrive")
@@ -684,8 +688,11 @@ class StreamExecution(
     // intentionally
     state.set(TERMINATED)
     if (microBatchThread.isAlive) {
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
       microBatchThread.interrupt()
       microBatchThread.join()
+      // microBatchThread may spawn new jobs, so we need to cancel again to 
prevent a leak
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
     }
     logInfo(s"Query $prettyIdString was stopped")
   }
@@ -825,6 +832,11 @@ class StreamExecution(
     }
   }
 
+  private def getBatchDescriptionString: String = {
+    val batchDescription = if (currentBatchId < 0) "init" else 
currentBatchId.toString
+    Option(name).map(_ + "<br/>").getOrElse("") +
+      s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6f0d2967/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 13fe51a..01ea62a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -25,6 +25,8 @@ import scala.util.control.ControlThrowable
 
 import org.apache.commons.io.FileUtils
 
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.ExplainCommand
@@ -500,6 +502,70 @@ class StreamSuite extends StreamTest {
       }
     }
   }
+
+  test("calling stop() on a query cancels related jobs") {
+    val input = MemoryStream[Int]
+    val query = input
+      .toDS()
+      .map { i =>
+        while (!org.apache.spark.TaskContext.get().isInterrupted()) {
+          // keep looping till interrupted by query.stop()
+          Thread.sleep(100)
+        }
+        i
+      }
+      .writeStream
+      .format("console")
+      .start()
+
+    input.addData(1)
+    // wait for jobs to start
+    eventually(timeout(streamingTimeout)) {
+      assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
+    }
+
+    query.stop()
+    // make sure jobs are stopped
+    eventually(timeout(streamingTimeout)) {
+      assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
+    }
+  }
+
+  test("batch id is updated correctly in the job description") {
+    val queryName = "memStream"
+    @volatile var jobDescription: String = null
+    def assertDescContainsQueryNameAnd(batch: Integer): Unit = {
+      // wait for listener event to be processed
+      spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
+      assert(jobDescription.contains(queryName) && 
jobDescription.contains(s"batch = $batch"))
+    }
+
+    spark.sparkContext.addSparkListener(new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        jobDescription = 
jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+      }
+    })
+
+    val input = MemoryStream[Int]
+    val query = input
+      .toDS()
+      .map(_ + 1)
+      .writeStream
+      .format("memory")
+      .queryName(queryName)
+      .start()
+
+    input.addData(1)
+    query.processAllAvailable()
+    assertDescContainsQueryNameAnd(batch = 0)
+    input.addData(2, 3)
+    query.processAllAvailable()
+    assertDescContainsQueryNameAnd(batch = 1)
+    input.addData(4)
+    query.processAllAvailable()
+    assertDescContainsQueryNameAnd(batch = 2)
+    query.stop()
+  }
 }
 
 abstract class FakeSource extends StreamSourceProvider {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to