But on the Kafka source level it should be perfectly fine to do what Elias
proposed. This is of course is not the perfect solution but could bring us
forward quite a bit. The changes required for this should also be minimal.
This would become obsolete once we have something like shared state. But
until then, I think it would worth a try.

Cheers,
Till

On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> The reason this selective reading doesn't work well in Flink in the moment
> is because of checkpointing. For checkpointing, checkpoint barriers travel
> within the streams. If we selectively read from inputs based on timestamps
> this is akin to blocking an input if that input is very far ahead in event
> time, which can happen when you have a very fast source and a slow source
> (in event time), maybe because you're in a catchup phase. In those cases
> it's better to simply not read the data at the sources, as Thomas said.
> This is also because with Kafka Streams, each operator is basically its own
> job: it's reading from Kafka and writing to Kafka and there is not a
> complex graph of different operations with network shuffles in between, as
> you have with Flink.
>
> This different nature of Flink is also why I think that readers need
> awareness of other readers to do the event-time alignment, and this is
> where shared state comes in.
>
> > On 10. Oct 2018, at 20:47, Elias Levy <fearsome.lucid...@gmail.com>
> wrote:
> >
> > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhue...@gmail.com> wrote:
> >
> >> I think the new source interface would be designed to be able to
> leverage
> >> shared state to achieve time alignment.
> >> I don't think this would be possible without some kind of shared state.
> >>
> >> The problem of tasks that are far ahead in time cannot be solved with
> >> back-pressure.
> >> That's because a task cannot choose from which source task it accepts
> >> events and from which doesn't.
> >> If it blocks an input, all downstream tasks that are connected to the
> >> operator are affected. This can easily lead to deadlocks.
> >> Therefore, all operators need to be able to handle events when they
> arrive.
> >> If they cannot process them yet because they are too far ahead in time,
> >> they are put in state.
> >>
> >
> > The idea I was suggesting is not for operators to block an input.
> Rather,
> > it is that they selectively choose from which input to process the next
> > message from based on their timestamp, so long as there are buffered
> > messages waiting to be processed.  That is a best-effort alignment
> > strategy.  Seems to work relatively well in practice, at least within
> Kafka
> > Streams.
> >
> > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> > its inputs.  Instead, it could keep them separate and selectively consume
> > from the one that had a buffer available, and if both have buffers
> > available, from the buffer with the messages with a lower timestamp.
>
>

Reply via email to