Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20097#discussion_r159552212
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -357,7 +397,16 @@ class MicroBatchExecution(
                 s"DataFrame returned by getBatch from $source did not have 
isStreaming=true\n" +
                   s"${batch.queryExecution.logical}")
               logDebug(s"Retrieving data from $source: $current -> $available")
    -          Some(source -> batch)
    +          Some(source -> batch.logicalPlan)
    +        case (reader: MicroBatchReader, available)
    +          if committedOffsets.get(reader).map(_ != 
available).getOrElse(true) =>
    +          val current = committedOffsets.get(reader).map(off => 
reader.deserializeOffset(off.json))
    +          reader.setOffsetRange(
    +            Optional.ofNullable(current.orNull),
    +            
Optional.of(available.asInstanceOf[v2.streaming.reader.Offset]))
    +          logDebug(s"Retrieving data from $reader: $current -> $available")
    +          Some(reader ->
    +            new 
StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
    --- End diff --
    
    Does the ContinuousExecution use it? Seems like this class was added in 
this PR. Are you planning to modify the ContinuousExecution to use it in 
future? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to