Hi all,

I'm building a source connector that relies on the HybridSource and am
running into issues on checkpoint recovery. The source connector relies on
two underlying sources, one that reads from files, and another that
consumes from Kafka. The file source provides offsets that allows me to
resume consumption from Kafka at a particular offset.

I use the SourceSwitchContext in the SourceFactory to derive a start
position in the Kafka source at source-switch time. However, it seems that
the current nor previous enumerator are stored within the
HybridSourceEnumeratorState.

So, upon recovery from a checkpoint where the source index has already been
incremented (switched to Kafka), I end up with NPEs when trying to derive
my start position based on the previous enumerator state.

One can reproduce this with a unit test in the HybridSourceSplitEnumerator
[1].

Has anyone worked with the HybridSource in this manner with a successful
workaround? I've filed a bug for this situation with more details [2].

[1] - https://github.com/apache/flink/pull/26975
[2] - https://issues.apache.org/jira/browse/FLINK-38345

Matt Cuento
[email protected]

Reply via email to