Repository: spark
Updated Branches:
  refs/heads/branch-2.2 0e97c8eef -> ede0e1a98


[SPARK-22403][SS] Add optional checkpointLocation argument to 
StructuredKafkaWordCount example

## What changes were proposed in this pull request?

When run in YARN cluster mode, the StructuredKafkaWordCount example fails 
because Spark tries to create a temporary checkpoint location in a subdirectory 
of the path given by java.io.tmpdir, and YARN sets java.io.tmpdir to a path in 
the local filesystem that usually does not correspond to an existing path in 
the distributed filesystem.
Add an optional checkpointLocation argument to the StructuredKafkaWordCount 
example so that users can specify the checkpoint location and avoid this issue.

## How was this patch tested?

Built and ran the example manually on YARN client and cluster mode.

Author: Wing Yew Poon <wyp...@cloudera.com>

Closes #19703 from wypoon/SPARK-22403.

(cherry picked from commit 11c4021044f3a302449a2ea76811e73f5c99a26a)
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ede0e1a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ede0e1a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ede0e1a9

Branch: refs/heads/branch-2.2
Commit: ede0e1a982f146da54fdd187a1c217bee8d0e1b4
Parents: 0e97c8e
Author: Wing Yew Poon <wyp...@cloudera.com>
Authored: Thu Nov 9 16:20:55 2017 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Thu Nov 9 16:21:06 2017 -0800

----------------------------------------------------------------------
 .../sql/streaming/StructuredKafkaWordCount.scala        | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ede0e1a9/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala
index c26f73e..2aab49c 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala
@@ -18,11 +18,14 @@
 // scalastyle:off println
 package org.apache.spark.examples.sql.streaming
 
+import java.util.UUID
+
 import org.apache.spark.sql.SparkSession
 
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
  * Usage: StructuredKafkaWordCount <bootstrap-servers> <subscribe-type> 
<topics>
+ *     [<checkpoint-location>]
  *   <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A
  *   comma-separated list of host:port.
  *   <subscribe-type> There are three kinds of type, i.e. 'assign', 
'subscribe',
@@ -36,6 +39,8 @@ import org.apache.spark.sql.SparkSession
  *   |- Only one of "assign, "subscribe" or "subscribePattern" options can be
  *   |  specified for Kafka source.
  *   <topics> Different value format depends on the value of 'subscribe-type'.
+ *   <checkpoint-location> Directory in which to create checkpoints. If not
+ *   provided, defaults to a randomized directory in /tmp.
  *
  * Example:
  *    `$ bin/run-example \
@@ -46,11 +51,13 @@ object StructuredKafkaWordCount {
   def main(args: Array[String]): Unit = {
     if (args.length < 3) {
       System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> 
" +
-        "<subscribe-type> <topics>")
+        "<subscribe-type> <topics> [<checkpoint-location>]")
       System.exit(1)
     }
 
-    val Array(bootstrapServers, subscribeType, topics) = args
+    val Array(bootstrapServers, subscribeType, topics, _*) = args
+    val checkpointLocation =
+      if (args.length > 3) args(3) else "/tmp/temporary-" + 
UUID.randomUUID.toString
 
     val spark = SparkSession
       .builder
@@ -76,6 +83,7 @@ object StructuredKafkaWordCount {
     val query = wordCounts.writeStream
       .outputMode("complete")
       .format("console")
+      .option("checkpointLocation", checkpointLocation)
       .start()
 
     query.awaitTermination()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to