[ 
https://issues.apache.org/jira/browse/SPARK-26629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-26629:
---------------------------------
    Fix Version/s:     (was: 2.3.4)

> Error with multiple file stream in a query + restart on a batch that has no 
> data for one file stream
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26629
>                 URL: https://issues.apache.org/jira/browse/SPARK-26629
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0, 2.4.1
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>            Priority: Major
>             Fix For: 2.4.1, 3.0.0
>
>
> When a streaming query has multiple file streams, and there is a batch where 
> one of the file streams dont have data in that batch, then if the query has 
> to restart from that, it will throw the following error.
> {code}
> java.lang.IllegalStateException: batch 1 doesn't exist
>       at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$.verifyBatchIds(HDFSMetadataLog.scala:300)
>       at 
> org.apache.spark.sql.execution.streaming.FileStreamSourceLog.get(FileStreamSourceLog.scala:120)
>       at 
> org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:181)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:294)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:291)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:291)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:178)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
>       at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:205)
> {code}
> **Reason**
> Existing {{HDFSMetadata.verifyBatchIds}} throws error whenever the batchIds 
> list was empty. In the context of {{FileStreamSource.getBatch}} (where verify 
> is called) and FileStreamSourceLog (subclass of HDFSMetadata), this is 
> usually okay because, in a streaming query with one file stream, the batchIds 
> can never be empty:
> A batch is planned only when the FileStreamSourceLog has seen new offset 
> (that is, there are new data files).
> So FileStreamSource.getBatch will be called on X to Y where X will always be 
> > Y. This calls internally {{HDFSMetadata.verifyBatchIds (X+1, Y)}} with 
> X+1-Y ids.
> For example, {{FileStreamSource.getBatch(4, 5)}} will call {{verify(batchIds 
> = Seq(5), start = 5, end = 5)}}. However, the invariant of X > Y is not true 
> when there are two file stream sources, as a batch may be planned even when 
> only one of the file streams has data. So one of the file stream may not have 
> data, which can call {{FileStreamSource.getBatch(X, X) -> verify(batchIds = 
> Seq.empty, start = X+1, end = X) -> failure}}.
> Note that FileStreamSource.getBatch(X, X) gets called only when restarting a 
> query in a batch where a file source did not have data. This is because, in 
> normal planning of batches, MicroBatchExecution avoids calling 
> {{FileStreamSource.getBatch(X, X)}} when offset X has not changed. However, 
> when restarting a stream at such a batch, 
> {{MicroBatchExecution.populateStartOffsets()}} calls 
> {{FileStreamSource.getBatch(X, X)}} (DataSource V1 hack to initialize the 
> source with last known offsets) thus hitting this issue.
> **Solution**
> The minimum solution (that can be backported) here is to skip verification 
> when FileStreamSource.getBatch(X, X).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to