Hi Jozef,
I responded on JIRA today before I saw your mail here.
The splitting of the UnboundedSource is performed during translation of
the Beam pipeline. It think it would be feasible to use Flink's maximum
parallelism instead of the configured parallelism. That would enable to
increase the parallelism at a later point in time.
Another option would be to split the sources again when scaling up; I'm
not sure whether that would work for all sources. Scaling down should be
easy because the wrapper supports reading from multiple sources.
Cheers,
Max
On 20.11.18 11:38, Jozef Vilcek wrote:
I want to reach out for opinions on what would be the best way to
proceed with https://issues.apache.org/jira/browse/BEAM-6077
The problem is, that when FlinkRunner job is being restored from
checkpoint, it needs to resurrect source and it's readers given the
checkpoint state. State element is represented by
`UnboundedSource.CheckpointMark` which does not tell much information.
Within CheckpointMark there might be already stored state per key, e.g.
in case of Kafka it is list of PartitionMarks having each partition_id
and offset.
UnboundedSource can create a reader per single CheckpointMark and reader
can produce single CheckpointMark from it's state. Now at rescale,
number of CheckpointMarks retrieved from state does not correspond to
actual parallelism. Merge or flatten needs to be invoked over list of
marks read from state. The question is, where such logic and knowledge
should be.
It feels similar to UnboundedSource.split(parallelism, pipelineOptions)
and also maybe related somehow to SplittableDoFn logic. Not sure.
My question is:
1. Is there a way to achieve such splitting / merging of checkpoint mark
with current SDK?
2. If not and it make sense to add it where it would best go? Source?
3. Some other approach Beam rookie as me do not see?
Best,
Jozef