This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 06d5b17 [SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream 06d5b17 is described below commit 06d5b173b687c23aa53e293ed6e12ec746393876 Author: Tathagata Das <tathagata.das1...@gmail.com> AuthorDate: Wed Jan 16 09:42:14 2019 -0800 [SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream ## What changes were proposed in this pull request? When a streaming query has multiple file streams, and there is a batch where one of the file streams dont have data in that batch, then if the query has to restart from that, it will throw the following error. ``` java.lang.IllegalStateException: batch 1 doesn't exist at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$.verifyBatchIds(HDFSMetadataLog.scala:300) at org.apache.spark.sql.execution.streaming.FileStreamSourceLog.get(FileStreamSourceLog.scala:120) at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:181) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:294) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:291) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:291) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:178) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:205) ``` Existing `HDFSMetadata.verifyBatchIds` threw error whenever the `batchIds` list was empty. In the context of `FileStreamSource.getBatch` (where verify is called) and `FileStreamSourceLog` (subclass of `HDFSMetadata`), this is usually okay because, in a streaming query with one file stream, the `batchIds` can never be empty: - A batch is planned only when the `FileStreamSourceLog` has seen new offset (that is, there are new data files). - So `FileStreamSource.getBatch` will be called on X to Y where X will always be > Y. This calls internally`HDFSMetadata.verifyBatchIds (X+1, Y)` with X+1-Y ids. For example.,`FileStreamSource.getBatch(4, 5)` will call `verify(batchIds = Seq(5), start = 5, end = 5)`. However, the invariant of X > Y is not true when there are two file stream sources, as a batch may be planned even when only one of the file streams has data. So one of the file stream may not have data, which can call `FileStreamSource.getBatch(X, X)` -> `verify(batchIds = Seq.empty, start = X+1, end = X)` -> failure. Note that `FileStreamSource.getBatch(X, X)` gets called **only when restarting a query in a batch where a file source did not have data**. This is because in normal planning of batches, `MicroBatchExecution` avoids calling `FileStreamSource.getBatch(X, X)` when offset X has not changed. However, when restarting a stream at such a batch, `MicroBatchExecution.populateStartOffsets()` calls `FileStreamSource.getBatch(X, X)` (DataSource V1 hack to initialize the source with last known offs [...] The minimum solution here is to skip verification when `FileStreamSource.getBatch(X, X)`. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23557 from tdas/SPARK-26629. Authored-by: Tathagata Das <tathagata.das1...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../execution/streaming/FileStreamSourceLog.scala | 4 +- .../sql/execution/streaming/HDFSMetadataLog.scala | 3 +- .../execution/streaming/HDFSMetadataLogSuite.scala | 6 ++ .../sql/streaming/FileStreamSourceSuite.scala | 75 ++++++++++++++++++++-- 4 files changed, 80 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 8628471..7b2ea96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -117,7 +117,9 @@ class FileStreamSourceLog( val batches = (existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1) - HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId) + if (startBatchId <= endBatchId) { + HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId) + } batches } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index bd0a461..62d524f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -262,7 +262,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: object HDFSMetadataLog { /** - * Verify if batchIds are continuous and between `startId` and `endId`. + * Verify if batchIds are continuous and between `startId` and `endId` (both inclusive and + * startId assumed to be <= endId). * * @param batchIds the sorted ids to verify. * @param startId the start id. If it's set, batchIds should start with this id. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 9268306..0e36e7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -178,5 +178,11 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), None, Some(5L))) intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), Some(5L))) intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), Some(5L))) + + // Related to SPARK-26629, this capatures the behavior for verifyBatchIds when startId > endId + intercept[IllegalStateException](verifyBatchIds(Seq(), Some(2L), Some(1L))) + intercept[AssertionError](verifyBatchIds(Seq(2), Some(2L), Some(1L))) + intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L))) + intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index de664ca..9235c6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -48,21 +48,33 @@ abstract class FileStreamSourceTest * `FileStreamSource` actually being used in the execution. */ abstract class AddFileData extends AddData { + private val _qualifiedBasePath = PrivateMethod[Path]('qualifiedBasePath) + + private def isSamePath(fileSource: FileStreamSource, srcPath: File): Boolean = { + val path = (fileSource invokePrivate _qualifiedBasePath()).toString.stripPrefix("file:") + path == srcPath.getCanonicalPath + } + override def addData(query: Option[StreamExecution]): (Source, Offset) = { require( query.nonEmpty, "Cannot add data when there is no query for finding the active file stream source") val sources = getSourcesFromStreamingQuery(query.get) - if (sources.isEmpty) { + val source = if (sources.isEmpty) { throw new Exception( "Could not find file source in the StreamExecution logical plan to add data to") - } else if (sources.size > 1) { - throw new Exception( - "Could not select the file source in the StreamExecution logical plan as there" + - "are multiple file sources:\n\t" + sources.mkString("\n\t")) + } else if (sources.size == 1) { + sources.head + } else { + val matchedSources = sources.filter(isSamePath(_, src)) + if (matchedSources.size != 1) { + throw new Exception( + "Could not select the file source in StreamExecution as there are multiple" + + s" file sources and none / more than one matches $src:\n" + sources.mkString("\n")) + } + matchedSources.head } - val source = sources.head val newOffset = source.withBatchingLocked { addData(source) new FileStreamSourceOffset(source.currentLogOffset + 1) @@ -71,6 +83,9 @@ abstract class FileStreamSourceTest (source, newOffset) } + /** Source directory to add file data to */ + protected def src: File + protected def addData(source: FileStreamSource): Unit } @@ -1494,6 +1509,54 @@ class FileStreamSourceSuite extends FileStreamSourceTest { newSource.getBatch(None, FileStreamSourceOffset(1)) } } + + test("SPARK-26629: multiple file sources work with restarts when a source does not have data") { + withTempDirs { case (dir, tmp) => + val sourceDir1 = new File(dir, "source1") + val sourceDir2 = new File(dir, "source2") + sourceDir1.mkdirs() + sourceDir2.mkdirs() + + val source1 = createFileStream("text", s"${sourceDir1.getCanonicalPath}") + val source2 = createFileStream("text", s"${sourceDir2.getCanonicalPath}") + val unioned = source1.union(source2) + + def addMultiTextFileData( + source1Content: String, + source2Content: String): StreamAction = { + val actions = Seq( + AddTextFileData(source1Content, sourceDir1, tmp), + AddTextFileData(source2Content, sourceDir2, tmp) + ).filter(_.content != null) // don't write to a source dir if no content specified + StreamProgressLockedActions(actions, desc = actions.mkString("[ ", " | ", " ]")) + } + + testStream(unioned)( + StartStream(), + addMultiTextFileData(source1Content = "source1_0", source2Content = "source2_0"), + CheckNewAnswer("source1_0", "source2_0"), + StopStream, + + StartStream(), + addMultiTextFileData(source1Content = "source1_1", source2Content = null), + CheckNewAnswer("source1_1"), + StopStream, + + // Restart after a batch with one file source having no new data. + // This restart is needed to hit the issue in SPARK-26629. + + StartStream(), + addMultiTextFileData(source1Content = null, source2Content = "source2_2"), + CheckNewAnswer("source2_2"), + StopStream, + + StartStream(), + addMultiTextFileData(source1Content = "source1_3", source2Content = "source2_3"), + CheckNewAnswer("source1_3", "source2_3"), + StopStream + ) + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org