Repository: spark
Updated Branches:
  refs/heads/master de934e671 -> 2bc327288


[SPARK-20894][SS] Resolve the checkpoint location in driver and use the 
resolved path in state store

## What changes were proposed in this pull request?

When the user runs a Structured Streaming query in a cluster, if the driver 
uses the local file system, StateStore running in executors will throw a 
file-not-found exception. However, the current error is not obvious.

This PR makes StreamExecution resolve the path in driver and uses the full path 
including the scheme part (such as `hdfs:/`, `file:/`) in StateStore.

Then if the above error happens, StateStore will throw an error with this full 
path which starts with `file:/`, and it makes this error obvious: the 
checkpoint location is on the local file system.

One potential minor issue is that the user cannot use different default file 
system settings in driver and executors (e.g., use a public HDFS address in 
driver and a private HDFS address in executors) after this change. However, 
since the batch query also has this issue (See 
https://github.com/apache/spark/blob/4bb6a53ebd06de3de97139a2dbc7c85fc3aa3e66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L402),
 it doesn't make things worse.

## How was this patch tested?

The new added test.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #18149 from zsxwing/SPARK-20894.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bc32728
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bc32728
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bc32728

Branch: refs/heads/master
Commit: 2bc3272880515649d3e10eba135831a2ed0e3465
Parents: de934e6
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed May 31 17:24:37 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed May 31 17:24:37 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala    | 16 +++++++++++-----
 .../apache/spark/sql/streaming/StreamSuite.scala | 19 +++++++++++++++++++
 .../sql/streaming/StreamingQuerySuite.scala      |  4 ++--
 .../test/DataStreamReaderWriterSuite.scala       |  8 ++++----
 4 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2bc32728/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ab86085..74f0f50 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -58,7 +58,7 @@ case object TERMINATED extends State
 class StreamExecution(
     override val sparkSession: SparkSession,
     override val name: String,
-    val checkpointRoot: String,
+    private val checkpointRoot: String,
     analyzedPlan: LogicalPlan,
     val sink: Sink,
     val trigger: Trigger,
@@ -84,6 +84,12 @@ class StreamExecution(
   private val startLatch = new CountDownLatch(1)
   private val terminationLatch = new CountDownLatch(1)
 
+  val resolvedCheckpointRoot = {
+    val checkpointPath = new Path(checkpointRoot)
+    val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+    checkpointPath.makeQualified(fs.getUri(), 
fs.getWorkingDirectory()).toUri.toString
+  }
+
   /**
    * Tracks how much data we have processed and committed to the sink or state 
store from each
    * input source.
@@ -154,7 +160,7 @@ class StreamExecution(
       case streamingRelation@StreamingRelation(dataSource, _, output) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
           // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
           val source = dataSource.createSource(metadataPath)
           nextSourceId += 1
           // We still need to use the previous `output` instead of 
`source.schema` as attributes in
@@ -233,14 +239,14 @@ class StreamExecution(
 
   /** Returns the path of a file with `name` in the checkpoint directory. */
   private def checkpointFile(name: String): String =
-    new Path(new Path(checkpointRoot), name).toUri.toString
+    new Path(new Path(resolvedCheckpointRoot), name).toUri.toString
 
   /**
    * Starts the execution. This returns only after the thread has started and 
[[QueryStartedEvent]]
    * has been posted to all the listeners.
    */
   def start(): Unit = {
-    logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query 
checkpoint.")
+    logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store 
the query checkpoint.")
     microBatchThread.setDaemon(true)
     microBatchThread.start()
     startLatch.await()  // Wait until thread started and QueryStart event has 
been posted
@@ -374,7 +380,7 @@ class StreamExecution(
 
         // Delete the temp checkpoint only when the query didn't fail
         if (deleteCheckpointOnStop && exception.isEmpty) {
-          val checkpointPath = new Path(checkpointRoot)
+          val checkpointPath = new Path(resolvedCheckpointRoot)
           try {
             val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
             fs.delete(checkpointPath, true)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bc32728/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 280f2dc..4ede4fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -617,6 +617,25 @@ class StreamSuite extends StreamTest {
     query.stop()
   }
 
+  test("should resolve the checkpoint path") {
+    withTempDir { dir =>
+      val checkpointLocation = dir.getCanonicalPath
+      assert(!checkpointLocation.startsWith("file:/"))
+      val query = MemoryStream[Int].toDF
+        .writeStream
+        .option("checkpointLocation", checkpointLocation)
+        .format("console")
+        .start()
+      try {
+        val resolvedCheckpointDir =
+          
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot
+        assert(resolvedCheckpointDir.startsWith("file:/"))
+      } finally {
+        query.stop()
+      }
+    }
+  }
+
   testQuietly("specify custom state store provider") {
     val queryName = "memStream"
     val providerClassName = classOf[TestStateStoreProvider].getCanonicalName

http://git-wip-us.apache.org/repos/asf/spark/blob/2bc32728/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index b69536e..0925646 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -466,7 +466,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
         CheckAnswer(6, 3, 6, 3, 1, 1),
 
         AssertOnQuery("metadata log should contain only two files") { q =>
-          val metadataLogDir = new 
java.io.File(q.offsetLog.metadataPath.toString)
+          val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri)
           val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
           val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // 
Workaround for SPARK-17475
           assert(toTest.size == 2 && toTest.head == "1")
@@ -492,7 +492,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
         CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8),
 
         AssertOnQuery("metadata log should contain three files") { q =>
-          val metadataLogDir = new 
java.io.File(q.offsetLog.metadataPath.toString)
+          val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri)
           val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
           val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // 
Workaround for SPARK-17475
           assert(toTest.size == 3 && toTest.head == "2")

http://git-wip-us.apache.org/repos/asf/spark/blob/2bc32728/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index dc2506a..b5f1e28 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -378,14 +378,14 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
     verify(LastOptions.mockStreamSourceProvider).createSource(
       any(),
-      meq(s"$checkpointLocationURI/sources/0"),
+      meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/0"),
       meq(None),
       meq("org.apache.spark.sql.streaming.test"),
       meq(Map.empty))
 
     verify(LastOptions.mockStreamSourceProvider).createSource(
       any(),
-      meq(s"$checkpointLocationURI/sources/1"),
+      meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/1"),
       meq(None),
       meq("org.apache.spark.sql.streaming.test"),
       meq(Map.empty))
@@ -642,7 +642,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
     import testImplicits._
     val query = MemoryStream[Int].toDS.writeStream.format("console").start()
     val checkpointDir = new Path(
-      query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
+      
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
     val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
     assert(fs.exists(checkpointDir))
     query.stop()
@@ -654,7 +654,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
     val input = MemoryStream[Int]
     val query = input.toDS.map(_ / 0).writeStream.format("console").start()
     val checkpointDir = new Path(
-      query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
+      
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
     val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
     assert(fs.exists(checkpointDir))
     input.addData(1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to