Repository: spark Updated Branches: refs/heads/master de934e671 -> 2bc327288
[SPARK-20894][SS] Resolve the checkpoint location in driver and use the resolved path in state store ## What changes were proposed in this pull request? When the user runs a Structured Streaming query in a cluster, if the driver uses the local file system, StateStore running in executors will throw a file-not-found exception. However, the current error is not obvious. This PR makes StreamExecution resolve the path in driver and uses the full path including the scheme part (such as `hdfs:/`, `file:/`) in StateStore. Then if the above error happens, StateStore will throw an error with this full path which starts with `file:/`, and it makes this error obvious: the checkpoint location is on the local file system. One potential minor issue is that the user cannot use different default file system settings in driver and executors (e.g., use a public HDFS address in driver and a private HDFS address in executors) after this change. However, since the batch query also has this issue (See https://github.com/apache/spark/blob/4bb6a53ebd06de3de97139a2dbc7c85fc3aa3e66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L402), it doesn't make things worse. ## How was this patch tested? The new added test. Author: Shixiong Zhu <shixi...@databricks.com> Closes #18149 from zsxwing/SPARK-20894. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bc32728 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bc32728 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bc32728 Branch: refs/heads/master Commit: 2bc3272880515649d3e10eba135831a2ed0e3465 Parents: de934e6 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Wed May 31 17:24:37 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed May 31 17:24:37 2017 -0700 ---------------------------------------------------------------------- .../execution/streaming/StreamExecution.scala | 16 +++++++++++----- .../apache/spark/sql/streaming/StreamSuite.scala | 19 +++++++++++++++++++ .../sql/streaming/StreamingQuerySuite.scala | 4 ++-- .../test/DataStreamReaderWriterSuite.scala | 8 ++++---- 4 files changed, 36 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2bc32728/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ab86085..74f0f50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -58,7 +58,7 @@ case object TERMINATED extends State class StreamExecution( override val sparkSession: SparkSession, override val name: String, - val checkpointRoot: String, + private val checkpointRoot: String, analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, @@ -84,6 +84,12 @@ class StreamExecution( private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) + val resolvedCheckpointRoot = { + val checkpointPath = new Path(checkpointRoot) + val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + checkpointPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri.toString + } + /** * Tracks how much data we have processed and committed to the sink or state store from each * input source. @@ -154,7 +160,7 @@ class StreamExecution( case streamingRelation@StreamingRelation(dataSource, _, output) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch - val metadataPath = s"$checkpointRoot/sources/$nextSourceId" + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" val source = dataSource.createSource(metadataPath) nextSourceId += 1 // We still need to use the previous `output` instead of `source.schema` as attributes in @@ -233,14 +239,14 @@ class StreamExecution( /** Returns the path of a file with `name` in the checkpoint directory. */ private def checkpointFile(name: String): String = - new Path(new Path(checkpointRoot), name).toUri.toString + new Path(new Path(resolvedCheckpointRoot), name).toUri.toString /** * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] * has been posted to all the listeners. */ def start(): Unit = { - logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.") + logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.") microBatchThread.setDaemon(true) microBatchThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted @@ -374,7 +380,7 @@ class StreamExecution( // Delete the temp checkpoint only when the query didn't fail if (deleteCheckpointOnStop && exception.isEmpty) { - val checkpointPath = new Path(checkpointRoot) + val checkpointPath = new Path(resolvedCheckpointRoot) try { val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) fs.delete(checkpointPath, true) http://git-wip-us.apache.org/repos/asf/spark/blob/2bc32728/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 280f2dc..4ede4fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -617,6 +617,25 @@ class StreamSuite extends StreamTest { query.stop() } + test("should resolve the checkpoint path") { + withTempDir { dir => + val checkpointLocation = dir.getCanonicalPath + assert(!checkpointLocation.startsWith("file:/")) + val query = MemoryStream[Int].toDF + .writeStream + .option("checkpointLocation", checkpointLocation) + .format("console") + .start() + try { + val resolvedCheckpointDir = + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot + assert(resolvedCheckpointDir.startsWith("file:/")) + } finally { + query.stop() + } + } + } + testQuietly("specify custom state store provider") { val queryName = "memStream" val providerClassName = classOf[TestStateStoreProvider].getCanonicalName http://git-wip-us.apache.org/repos/asf/spark/blob/2bc32728/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b69536e..0925646 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -466,7 +466,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckAnswer(6, 3, 6, 3, 1, 1), AssertOnQuery("metadata log should contain only two files") { q => - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475 assert(toTest.size == 2 && toTest.head == "1") @@ -492,7 +492,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8), AssertOnQuery("metadata log should contain three files") { q => - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475 assert(toTest.size == 3 && toTest.head == "2") http://git-wip-us.apache.org/repos/asf/spark/blob/2bc32728/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index dc2506a..b5f1e28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -378,14 +378,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { verify(LastOptions.mockStreamSourceProvider).createSource( any(), - meq(s"$checkpointLocationURI/sources/0"), + meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/0"), meq(None), meq("org.apache.spark.sql.streaming.test"), meq(Map.empty)) verify(LastOptions.mockStreamSourceProvider).createSource( any(), - meq(s"$checkpointLocationURI/sources/1"), + meq(s"${makeQualifiedPath(checkpointLocationURI.toString)}/sources/1"), meq(None), meq("org.apache.spark.sql.streaming.test"), meq(Map.empty)) @@ -642,7 +642,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { import testImplicits._ val query = MemoryStream[Int].toDS.writeStream.format("console").start() val checkpointDir = new Path( - query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot) + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot) val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) assert(fs.exists(checkpointDir)) query.stop() @@ -654,7 +654,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { val input = MemoryStream[Int] val query = input.toDS.map(_ / 0).writeStream.format("console").start() val checkpointDir = new Path( - query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot) + query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot) val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) assert(fs.exists(checkpointDir)) input.addData(1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org