Hi all, I'm building a source connector that relies on the HybridSource and am running into issues on checkpoint recovery. The source connector relies on two underlying sources, one that reads from files, and another that consumes from Kafka. The file source provides offsets that allows me to resume consumption from Kafka at a particular offset.
I use the SourceSwitchContext in the SourceFactory to derive a start position in the Kafka source at source-switch time. However, it seems that the current nor previous enumerator are stored within the HybridSourceEnumeratorState. So, upon recovery from a checkpoint where the source index has already been incremented (switched to Kafka), I end up with NPEs when trying to derive my start position based on the previous enumerator state. One can reproduce this with a unit test in the HybridSourceSplitEnumerator [1]. Has anyone worked with the HybridSource in this manner with a successful workaround? I've filed a bug for this situation with more details [2]. [1] - https://github.com/apache/flink/pull/26975 [2] - https://issues.apache.org/jira/browse/FLINK-38345 Matt Cuento [email protected]
