Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20097#discussion_r159553087
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -317,6 +355,8 @@ class MicroBatchExecution(
               if (prevBatchOff.isDefined) {
                 prevBatchOff.get.toStreamProgress(sources).foreach {
                   case (src: Source, off) => src.commit(off)
    +              case (reader: MicroBatchReader, off) =>
    +                reader.commit(reader.deserializeOffset(off.json))
    --- End diff --
    
    Quick summary:
    
    V1 sources were silently responsible for checking every offset they 
receive, and deserializing it if it's a SerializedOffset.
    
    This is awkward, so SerializedOffset isn't being migrated to the v2 API. 
For v2 sources, all Offset instances passed to a reader will have the right 
type in the first place, and the execution engine will deserialize them from 
JSON form with the deserializeOffset handle. In the long term, the conversion 
to SerializedOffset can be removed entirely.
    
    But as long as the v1 path is around, we can't (without a lot of pointless 
effort) change the offset log to not return Offset instances. So we have to 
pull the JSON out of the SerializedOffset and then deserialize it properly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to