Sorry for also derailing this a bit earlier...

I think the two things (shared state and new source interface) are somewhat 
orthogonal. The new source interface itself alone doesn't solve the problem, we 
would still need some mechanism for sharing the event-time information between 
different subtasks. This could be the state sharing mechanism. Therefore I 
would say we should not block one on the other and therefore should go ahead 
with state sharing.

With recent releases we started to abstract Akka away behind RPC interfaces, so 
we probably shouldn't introduce a hard dependency on Akka (or another system) 
again. Maybe Till (cc'ed) could shed some light on this. It might be that we 
just have to design a generic interface and then use Akka underneath.


> On 10. Oct 2018, at 16:18, Jamie Grier <jgr...@lyft.com.INVALID> wrote:
> 
> Also, I'm afraid I derailed this thread just a bit..  So also back to
> Thomas's original question..
> 
> If we decide state-sharing across source subtasks is the way forward for
> now -- does anybody have thoughts to share on what form this should take?
> 
> Thomas mentioned Akka or JGroups.  Other thoughts?
> 
> 
> On Wed, Oct 10, 2018 at 6:58 AM Jamie Grier <jgr...@lyft.com> wrote:
> 
>> Okay, so I think there is a lot of agreement here about (a) This is a real
>> issue for people, and (b) an ideal long-term approach to solving it.
>> 
>> As Aljoscha and Elias said a full solution to this would be to also
>> redesign the source interface such that individual partitions are exposed
>> in the API and not hidden inside sources like now -- then we could be much
>> smarter about the way we read from the individual partitions.  We would
>> also have to modify the stream task code such that it also reads in a
>> time-aligned way throughout the data flow to solve the full problem --
>> either that or use some shared state between sources to keep them
>> time-aligned across sub-tasks just at the source.
>> 
>> With regard to this question of state sharing between source sub-tasks
>> versus modifying Flink to do time-aligned reads throughout the system --
>> does anybody have a strong opinion on this?
>> 
>> We're basically looking for a way forward and our initial approach, though
>> ugly because it requires modification to all of the sources we use, was
>> going to be to share state between source sub-tasks in order to keep them
>> time-aligned with no further modifications required to Flink's core.
>> 
>> However, if it seems reasonable to do and there is consensus on the best
>> way forward maybe we should be looking at introducing the time-alignment
>> properly instead of hacking the sources.
>> 
>> 
>> 
>> 
>> On Tue, Oct 9, 2018 at 12:01 PM Elias Levy <fearsome.lucid...@gmail.com>
>> wrote:
>> 
>>> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>> 
>>>> @Elias Do you know if Kafka Consumers do this alignment across multiple
>>>> consumers or only within one Consumer across the partitions that it
>>> reads
>>>> from.
>>>> 
>>> 
>>> The behavior is part of Kafka Streams
>>> <
>>> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
>>>> ,
>>> not the Kafka consumer.  The alignment does not occur across Kafka
>>> consumers, but that is because Kafka Streams, unlikely Flink, uses a
>>> single
>>> consumer to fetch records from multiple sources / topics.  The alignment
>>> occurs with the stream task.  Stream tasks keep queues per topic-partition
>>> (which may be from different topics), and select the next record to
>>> processed by selecting the queue with the lowest timestamp.
>>> 
>>> The equivalent in Flink would be for the Kafka connector source to select
>>> the message among partitions with the lowest timestamp to emit next, and
>>> for multiple input stream operators to select the message among inputs
>>> with
>>> the lowest timestamp to process.
>>> 
>> 

Reply via email to