I want to reach out for opinions on what would be the best way to proceed
with https://issues.apache.org/jira/browse/BEAM-6077

The problem is, that when FlinkRunner job is being restored from
checkpoint, it needs to resurrect source and it's readers given the
checkpoint state. State element is represented by
`UnboundedSource.CheckpointMark` which does not tell much information.
Within CheckpointMark there might be already stored state per key, e.g. in
case of Kafka it is list of PartitionMarks having each partition_id and
offset.

UnboundedSource can create a reader per single CheckpointMark and reader
can produce single CheckpointMark from it's state. Now at rescale, number
of CheckpointMarks retrieved from state does not correspond to actual
parallelism. Merge or flatten needs to be invoked over list of marks read
from state. The question is, where such logic and knowledge should be.

It feels similar to UnboundedSource.split(parallelism, pipelineOptions) and
also maybe related somehow to SplittableDoFn logic. Not sure.

My question is:
1. Is there a way to achieve such splitting / merging of checkpoint mark
with current SDK?
2. If not and it make sense to add it where it would best go? Source?
3. Some other approach Beam rookie as me do not see?

Best,
Jozef

Reply via email to