Hi Biao,

Thanks for the answer!

So given the multi-threaded readers, now we have as open questions:

1) How do we let the checkpoints pass through our multi-threaded reader
operator?

2) Do we have separate reader and source operators or not? In the strategy
that has a separate source, the source operator has a parallelism of 1 and
is responsible for split recovery only.

For the first one, given also the constraints (blocking, finite queues,
etc), I do not have an answer yet.

For the 2nd, I think that we should go with separate operators for the
source and the readers, for the following reasons:

1) This is more aligned with a potential future improvement where the split
discovery becomes a responsibility of the JobManager and readers are
pooling more work from the JM.

2) The source is going to be the "single point of truth". It will know what
has been processed and what not. If the source and the readers are a single
operator with parallelism > 1, or in general, if the split discovery is
done by each task individually, then:
   i) we have to have a deterministic scheme for each reader to assign
splits to itself (e.g. mod subtaskId). This is not necessarily trivial for
all sources.
   ii) each reader would have to keep a copy of all its processed slpits
   iii) the state has to be a union state with a non-trivial merging logic
in order to support rescaling.

Two additional points that you raised above:

i) The point that you raised that we need to keep all splits (processed and
not-processed) I think is a bit of a strong requirement. This would imply
that for infinite sources the state will grow indefinitely. This is problem
is even more pronounced if we do not have a single source that assigns
splits to readers, as each reader will have its own copy of the state.

ii) it is true that for finite sources we need to somehow not close the
readers when the source/split discoverer finishes. The
ContinuousFileReaderOperator has a work-around for that. It is not elegant,
and checkpoints are not emitted after closing the source, but this, I
believe, is a bigger problem which requires more changes than just
refactoring the source interface.

Cheers,
Kostas

Reply via email to