Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20097#discussion_r159553087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -317,6 +355,8 @@ class MicroBatchExecution( if (prevBatchOff.isDefined) { prevBatchOff.get.toStreamProgress(sources).foreach { case (src: Source, off) => src.commit(off) + case (reader: MicroBatchReader, off) => + reader.commit(reader.deserializeOffset(off.json)) --- End diff -- Quick summary: V1 sources were silently responsible for checking every offset they receive, and deserializing it if it's a SerializedOffset. This is awkward, so SerializedOffset isn't being migrated to the v2 API. For v2 sources, all Offset instances passed to a reader will have the right type in the first place, and the execution engine will deserialize them from JSON form with the deserializeOffset handle. In the long term, the conversion to SerializedOffset can be removed entirely. But as long as the v1 path is around, we can't (without a lot of pointless effort) change the offset log to not return Offset instances. So we have to pull the JSON out of the SerializedOffset and then deserialize it properly.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org