Re: Sharing state between subtasks

2019-03-07 Thread Thomas Weise
The state sharing support will be part of the upcoming 1.8 release.

We have also completed most of the synchronization work for the Kinesis
consumer and will contribute those changes to Flink soon.

Most of the code will be reusable for Kafka consumer.

We will need the same support in the Kafka consumer but have not started
work to integrate that yet.

Thomas

On Thu, Mar 7, 2019 at 6:53 AM Gerard Garcia  wrote:

> Any advance related to synchronizing ingestion by event/ingestion-time
> between kafka partitions?
>
> On Thu, Nov 15, 2018 at 1:27 AM Jamie Grier 
> wrote:
>
> > Hey all,
> >
> > I think all we need for this on the state sharing side is pretty
> simple.  I
> > opened a JIRA to track this work and submitted a PR for the state sharing
> > bit.
> >
> > https://issues.apache.org/jira/browse/FLINK-10886
> > https://github.com/apache/flink/pull/7099
> >
> > Please provide feedback :)
> >
> > -Jamie
> >
> >
> > On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann 
> wrote:
> >
> > > Hi Thomas,
> > >
> > > using Akka directly would further manifest our dependency on Scala in
> > > flink-runtime. This is something we are currently trying to get rid of.
> > For
> > > that purpose we have added the RpcService abstraction which
> encapsulates
> > > all Akka specific logic. We hope that we can soon get rid of the Scala
> > > dependency in flink-runtime by using a special class loader only for
> > > loading the AkkaRpcService implementation.
> > >
> > > I think the easiest way to sync the task information is actually going
> > > through the JobMaster because the subtasks don't know on which other
> TMs
> > > the other subtasks run. Otherwise, we would need to have some TM
> > detection
> > > mechanism between TMs. If you choose this way, then you should be able
> to
> > > use the RpcService by extending the JobMasterGateway by additional
> RPCs.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise  wrote:
> > >
> > > > Hi,
> > > >
> > > > We are planning to work on the Kinesis consumer in the following
> order:
> > > >
> > > > 1. Add per shard watermarking:
> > > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code
> > we
> > > > already use internally; I will open a PR to add it to the Flink
> Kinesis
> > > > consumer
> > > > 2. Exchange of per subtask watermarks between all subtasks of one or
> > > > multiple sources
> > > > 3. Implement queue approach described in Jamie's document in to
> utilize
> > > 1.)
> > > > and 2.) to align the shard consumers WRT event time
> > > >
> > > > There was some discussion regarding the mechanism to share the
> > watermarks
> > > > between subtasks. If there is something that can be re-used it would
> be
> > > > great. Otherwise I'm going to further investigate the Akka or JGroups
> > > > route. Regarding Akka, since it is used within Flink already, is
> there
> > an
> > > > abstraction that you would recommend to consider to avoid direct
> > > > dependency?
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > >
> > > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > > >  wrote:
> > > >
> > > > > Not yet. We only have some initial thoughts and have not worked on
> it
> > > > yet.
> > > > > We will update the progress in this discussion if have.
> > > > >
> > > > > Best,
> > > > > Zhijiang
> > > > > --
> > > > > 发件人:Aljoscha Krettek 
> > > > > 发送时间:2018年10月18日(星期四) 17:53
> > > > > 收件人:dev ; Zhijiang(wangzhijiang999) <
> > > > > wangzhijiang...@aliyun.com>
> > > > > 抄 送:Till Rohrmann 
> > > > > 主 题:Re: Sharing state between subtasks
> > > > >
> > > > > Hi Zhijiang,
> > > > >
> > > > > do you already have working code or a design doc for the second
> > > approach?
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > >

Re: Sharing state between subtasks

2019-03-07 Thread Gerard Garcia
Any advance related to synchronizing ingestion by event/ingestion-time
between kafka partitions?

On Thu, Nov 15, 2018 at 1:27 AM Jamie Grier  wrote:

> Hey all,
>
> I think all we need for this on the state sharing side is pretty simple.  I
> opened a JIRA to track this work and submitted a PR for the state sharing
> bit.
>
> https://issues.apache.org/jira/browse/FLINK-10886
> https://github.com/apache/flink/pull/7099
>
> Please provide feedback :)
>
> -Jamie
>
>
> On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann  wrote:
>
> > Hi Thomas,
> >
> > using Akka directly would further manifest our dependency on Scala in
> > flink-runtime. This is something we are currently trying to get rid of.
> For
> > that purpose we have added the RpcService abstraction which encapsulates
> > all Akka specific logic. We hope that we can soon get rid of the Scala
> > dependency in flink-runtime by using a special class loader only for
> > loading the AkkaRpcService implementation.
> >
> > I think the easiest way to sync the task information is actually going
> > through the JobMaster because the subtasks don't know on which other TMs
> > the other subtasks run. Otherwise, we would need to have some TM
> detection
> > mechanism between TMs. If you choose this way, then you should be able to
> > use the RpcService by extending the JobMasterGateway by additional RPCs.
> >
> > Cheers,
> > Till
> >
> > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise  wrote:
> >
> > > Hi,
> > >
> > > We are planning to work on the Kinesis consumer in the following order:
> > >
> > > 1. Add per shard watermarking:
> > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code
> we
> > > already use internally; I will open a PR to add it to the Flink Kinesis
> > > consumer
> > > 2. Exchange of per subtask watermarks between all subtasks of one or
> > > multiple sources
> > > 3. Implement queue approach described in Jamie's document in to utilize
> > 1.)
> > > and 2.) to align the shard consumers WRT event time
> > >
> > > There was some discussion regarding the mechanism to share the
> watermarks
> > > between subtasks. If there is something that can be re-used it would be
> > > great. Otherwise I'm going to further investigate the Akka or JGroups
> > > route. Regarding Akka, since it is used within Flink already, is there
> an
> > > abstraction that you would recommend to consider to avoid direct
> > > dependency?
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > >
> > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > >  wrote:
> > >
> > > > Not yet. We only have some initial thoughts and have not worked on it
> > > yet.
> > > > We will update the progress in this discussion if have.
> > > >
> > > > Best,
> > > > Zhijiang
> > > > --
> > > > 发件人:Aljoscha Krettek 
> > > > 发送时间:2018年10月18日(星期四) 17:53
> > > > 收件人:dev ; Zhijiang(wangzhijiang999) <
> > > > wangzhijiang...@aliyun.com>
> > > > 抄 送:Till Rohrmann 
> > > > 主 题:Re: Sharing state between subtasks
> > > >
> > > > Hi Zhijiang,
> > > >
> > > > do you already have working code or a design doc for the second
> > approach?
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > > wangzhijiang...@aliyun.com.INVALID> wrote:
> > > > >
> > > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > > update and I want to share some thoughts from our experiences.
> > > > >
> > > > > We also encountered the source consuption skew issue before, and we
> > are
> > > > focused on improving this by two possible ways.
> > > > >
> > > > > 1. Control the read strategy by the downstream side. In detail,
> every
> > > > input channel in downstream task corresponds to the consumption of
> one
> > > > upstream source task, and we will tag each input channel with
> watermark
> > > to
> > > > find the lowest channel to read in high priority. In essence, we
> > actually
> > > > rely on the mechanism of backpressure. If the channel with highest
>

Re: Sharing state between subtasks

2018-11-14 Thread Jamie Grier
Hey all,

I think all we need for this on the state sharing side is pretty simple.  I
opened a JIRA to track this work and submitted a PR for the state sharing
bit.

https://issues.apache.org/jira/browse/FLINK-10886
https://github.com/apache/flink/pull/7099

Please provide feedback :)

-Jamie


On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann  wrote:

> Hi Thomas,
>
> using Akka directly would further manifest our dependency on Scala in
> flink-runtime. This is something we are currently trying to get rid of. For
> that purpose we have added the RpcService abstraction which encapsulates
> all Akka specific logic. We hope that we can soon get rid of the Scala
> dependency in flink-runtime by using a special class loader only for
> loading the AkkaRpcService implementation.
>
> I think the easiest way to sync the task information is actually going
> through the JobMaster because the subtasks don't know on which other TMs
> the other subtasks run. Otherwise, we would need to have some TM detection
> mechanism between TMs. If you choose this way, then you should be able to
> use the RpcService by extending the JobMasterGateway by additional RPCs.
>
> Cheers,
> Till
>
> On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise  wrote:
>
> > Hi,
> >
> > We are planning to work on the Kinesis consumer in the following order:
> >
> > 1. Add per shard watermarking:
> > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
> > already use internally; I will open a PR to add it to the Flink Kinesis
> > consumer
> > 2. Exchange of per subtask watermarks between all subtasks of one or
> > multiple sources
> > 3. Implement queue approach described in Jamie's document in to utilize
> 1.)
> > and 2.) to align the shard consumers WRT event time
> >
> > There was some discussion regarding the mechanism to share the watermarks
> > between subtasks. If there is something that can be re-used it would be
> > great. Otherwise I'm going to further investigate the Akka or JGroups
> > route. Regarding Akka, since it is used within Flink already, is there an
> > abstraction that you would recommend to consider to avoid direct
> > dependency?
> >
> > Thanks,
> > Thomas
> >
> >
> >
> > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> >  wrote:
> >
> > > Not yet. We only have some initial thoughts and have not worked on it
> > yet.
> > > We will update the progress in this discussion if have.
> > >
> > > Best,
> > > Zhijiang
> > > --
> > > 发件人:Aljoscha Krettek 
> > > 发送时间:2018年10月18日(星期四) 17:53
> > > 收件人:dev ; Zhijiang(wangzhijiang999) <
> > > wangzhijiang...@aliyun.com>
> > > 抄 送:Till Rohrmann 
> > > 主 题:Re: Sharing state between subtasks
> > >
> > > Hi Zhijiang,
> > >
> > > do you already have working code or a design doc for the second
> approach?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > wangzhijiang...@aliyun.com.INVALID> wrote:
> > > >
> > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > update and I want to share some thoughts from our experiences.
> > > >
> > > > We also encountered the source consuption skew issue before, and we
> are
> > > focused on improving this by two possible ways.
> > > >
> > > > 1. Control the read strategy by the downstream side. In detail, every
> > > input channel in downstream task corresponds to the consumption of one
> > > upstream source task, and we will tag each input channel with watermark
> > to
> > > find the lowest channel to read in high priority. In essence, we
> actually
> > > rely on the mechanism of backpressure. If the channel with highest
> > > timestamp is not read by downstream task for a while, it will block the
> > > corresponding source task to read when the buffers are exhausted. It is
> > no
> > > need to change the source interface in this way, but there are two
> major
> > > concerns: first it will affect the barier alignment resulting in
> > checkpoint
> > > delayed or expired. Second it can not confirm source consumption
> > alignment
> > > very precisely, and it is just a best effort way. So we gave up this
> way
> > > finally.
> > > >
> > > > 2. Add the new component of SourceCoordinator to coordinate the
> so

Re: Sharing state between subtasks

2018-11-01 Thread Till Rohrmann
Hi Thomas,

using Akka directly would further manifest our dependency on Scala in
flink-runtime. This is something we are currently trying to get rid of. For
that purpose we have added the RpcService abstraction which encapsulates
all Akka specific logic. We hope that we can soon get rid of the Scala
dependency in flink-runtime by using a special class loader only for
loading the AkkaRpcService implementation.

I think the easiest way to sync the task information is actually going
through the JobMaster because the subtasks don't know on which other TMs
the other subtasks run. Otherwise, we would need to have some TM detection
mechanism between TMs. If you choose this way, then you should be able to
use the RpcService by extending the JobMasterGateway by additional RPCs.

Cheers,
Till

On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise  wrote:

> Hi,
>
> We are planning to work on the Kinesis consumer in the following order:
>
> 1. Add per shard watermarking:
> https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
> already use internally; I will open a PR to add it to the Flink Kinesis
> consumer
> 2. Exchange of per subtask watermarks between all subtasks of one or
> multiple sources
> 3. Implement queue approach described in Jamie's document in to utilize 1.)
> and 2.) to align the shard consumers WRT event time
>
> There was some discussion regarding the mechanism to share the watermarks
> between subtasks. If there is something that can be re-used it would be
> great. Otherwise I'm going to further investigate the Akka or JGroups
> route. Regarding Akka, since it is used within Flink already, is there an
> abstraction that you would recommend to consider to avoid direct
> dependency?
>
> Thanks,
> Thomas
>
>
>
> On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
>  wrote:
>
> > Not yet. We only have some initial thoughts and have not worked on it
> yet.
> > We will update the progress in this discussion if have.
> >
> > Best,
> > Zhijiang
> > --
> > 发件人:Aljoscha Krettek 
> > 发送时间:2018年10月18日(星期四) 17:53
> > 收件人:dev ; Zhijiang(wangzhijiang999) <
> > wangzhijiang...@aliyun.com>
> > 抄 送:Till Rohrmann 
> > 主 题:Re: Sharing state between subtasks
> >
> > Hi Zhijiang,
> >
> > do you already have working code or a design doc for the second approach?
> >
> > Best,
> > Aljoscha
> >
> > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > wangzhijiang...@aliyun.com.INVALID> wrote:
> > >
> > > Just noticed this discussion from @Till Rohrmann's weekly community
> > update and I want to share some thoughts from our experiences.
> > >
> > > We also encountered the source consuption skew issue before, and we are
> > focused on improving this by two possible ways.
> > >
> > > 1. Control the read strategy by the downstream side. In detail, every
> > input channel in downstream task corresponds to the consumption of one
> > upstream source task, and we will tag each input channel with watermark
> to
> > find the lowest channel to read in high priority. In essence, we actually
> > rely on the mechanism of backpressure. If the channel with highest
> > timestamp is not read by downstream task for a while, it will block the
> > corresponding source task to read when the buffers are exhausted. It is
> no
> > need to change the source interface in this way, but there are two major
> > concerns: first it will affect the barier alignment resulting in
> checkpoint
> > delayed or expired. Second it can not confirm source consumption
> alignment
> > very precisely, and it is just a best effort way. So we gave up this way
> > finally.
> > >
> > > 2. Add the new component of SourceCoordinator to coordinate the source
> > consumption distributedly. For example we can start this componnet in the
> > JobManager like the current role of CheckpointCoordinator. Then every
> > source task would commnicate with JobManager via current RPC mechanism,
> > maybe we can rely on the heartbeat message to attach the consumption
> > progress as the payloads. The JobManagerwill accumulator or state all the
> > reported progress and then give responses for different source tasks. We
> > can define a protocol for indicating the fast soruce task to sleep for
> > specific time for example. To do so, the coordinator has the global
> > informations to give the proper decision for individuals, so it seems
> more
> > precise. And it will not affect the barrier alignment, because the
> sleeping
> > fast so

Re: Sharing state between subtasks

2018-10-31 Thread Thomas Weise
Hi,

We are planning to work on the Kinesis consumer in the following order:

1. Add per shard watermarking:
https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
already use internally; I will open a PR to add it to the Flink Kinesis
consumer
2. Exchange of per subtask watermarks between all subtasks of one or
multiple sources
3. Implement queue approach described in Jamie's document in to utilize 1.)
and 2.) to align the shard consumers WRT event time

There was some discussion regarding the mechanism to share the watermarks
between subtasks. If there is something that can be re-used it would be
great. Otherwise I'm going to further investigate the Akka or JGroups
route. Regarding Akka, since it is used within Flink already, is there an
abstraction that you would recommend to consider to avoid direct dependency?

Thanks,
Thomas



On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
 wrote:

> Not yet. We only have some initial thoughts and have not worked on it yet.
> We will update the progress in this discussion if have.
>
> Best,
> Zhijiang
> --
> 发件人:Aljoscha Krettek 
> 发送时间:2018年10月18日(星期四) 17:53
> 收件人:dev ; Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com>
> 抄 送:Till Rohrmann 
> 主 题:Re: Sharing state between subtasks
>
> Hi Zhijiang,
>
> do you already have working code or a design doc for the second approach?
>
> Best,
> Aljoscha
>
> > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com.INVALID> wrote:
> >
> > Just noticed this discussion from @Till Rohrmann's weekly community
> update and I want to share some thoughts from our experiences.
> >
> > We also encountered the source consuption skew issue before, and we are
> focused on improving this by two possible ways.
> >
> > 1. Control the read strategy by the downstream side. In detail, every
> input channel in downstream task corresponds to the consumption of one
> upstream source task, and we will tag each input channel with watermark to
> find the lowest channel to read in high priority. In essence, we actually
> rely on the mechanism of backpressure. If the channel with highest
> timestamp is not read by downstream task for a while, it will block the
> corresponding source task to read when the buffers are exhausted. It is no
> need to change the source interface in this way, but there are two major
> concerns: first it will affect the barier alignment resulting in checkpoint
> delayed or expired. Second it can not confirm source consumption alignment
> very precisely, and it is just a best effort way. So we gave up this way
> finally.
> >
> > 2. Add the new component of SourceCoordinator to coordinate the source
> consumption distributedly. For example we can start this componnet in the
> JobManager like the current role of CheckpointCoordinator. Then every
> source task would commnicate with JobManager via current RPC mechanism,
> maybe we can rely on the heartbeat message to attach the consumption
> progress as the payloads. The JobManagerwill accumulator or state all the
> reported progress and then give responses for different source tasks. We
> can define a protocol for indicating the fast soruce task to sleep for
> specific time for example. To do so, the coordinator has the global
> informations to give the proper decision for individuals, so it seems more
> precise. And it will not affect the barrier alignment, because the sleeping
> fast source can release the lock to emit barrier as normal. The only
> concern is the changes for source interface and may affect all related
> source implementations.
> >
> > Currently we prefer to the second way to implement and will refer to
> other good points above. :)
> >
> > Best,
> > Zhijiang
> > --
> > 发件人:Jamie Grier 
> > 发送时间:2018年10月17日(星期三) 03:28
> > 收件人:dev 
> > 主 题:Re: Sharing state between subtasks
> >
> > Here's a doc I started describing some changes we would like to make
> > starting with the Kinesis Source.. It describes a refactoring of that
> code
> > specifically and also hopefully a pattern and some reusable code we can
> use
> > in the other sources as well.  The end goal would be best-effort
> event-time
> > synchronization across all Flink sources but we are going to start with
> the
> > Kinesis Source first.
> >
> > Please take a look and please provide thoughts and opinions about the
> best
> > state sharing mechanism to use -- that section is left blank and we're
> > especially looking for input there.
> >
> >
> https:/

回复:Sharing state between subtasks

2018-10-18 Thread Zhijiang(wangzhijiang999)
Not yet. We only have some initial thoughts and have not worked on it yet. We 
will update the progress in this discussion if have.

Best,
Zhijiang
--
发件人:Aljoscha Krettek 
发送时间:2018年10月18日(星期四) 17:53
收件人:dev ; Zhijiang(wangzhijiang999) 

抄 送:Till Rohrmann 
主 题:Re: Sharing state between subtasks

Hi Zhijiang,

do you already have working code or a design doc for the second approach?

Best,
Aljoscha

> On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) 
>  wrote:
> 
> Just noticed this discussion from @Till Rohrmann's weekly community update 
> and I want to share some thoughts from our experiences.
> 
> We also encountered the source consuption skew issue before, and we are 
> focused on improving this by two possible ways.
> 
> 1. Control the read strategy by the downstream side. In detail, every input 
> channel in downstream task corresponds to the consumption of one upstream 
> source task, and we will tag each input channel with watermark to find the 
> lowest channel to read in high priority. In essence, we actually rely on the 
> mechanism of backpressure. If the channel with highest timestamp is not read 
> by downstream task for a while, it will block the corresponding source task 
> to read when the buffers are exhausted. It is no need to change the source 
> interface in this way, but there are two major concerns: first it will affect 
> the barier alignment resulting in checkpoint delayed or expired. Second it 
> can not confirm source consumption alignment very precisely, and it is just a 
> best effort way. So we gave up this way finally.
> 
> 2. Add the new component of SourceCoordinator to coordinate the source 
> consumption distributedly. For example we can start this componnet in the 
> JobManager like the current role of CheckpointCoordinator. Then every source 
> task would commnicate with JobManager via current RPC mechanism, maybe we can 
> rely on the heartbeat message to attach the consumption progress as the 
> payloads. The JobManagerwill accumulator or state all the reported progress 
> and then give responses for different source tasks. We can define a protocol 
> for indicating the fast soruce task to sleep for specific time for example. 
> To do so, the coordinator has the global informations to give the proper 
> decision for individuals, so it seems more precise. And it will not affect 
> the barrier alignment, because the sleeping fast source can release the lock 
> to emit barrier as normal. The only concern is the changes for source 
> interface and may affect all related source implementations.
> 
> Currently we prefer to the second way to implement and will refer to other 
> good points above. :)
> 
> Best,
> Zhijiang
> --------------
> 发件人:Jamie Grier 
> 发送时间:2018年10月17日(星期三) 03:28
> 收件人:dev 
> 主 题:Re: Sharing state between subtasks
> 
> Here's a doc I started describing some changes we would like to make
> starting with the Kinesis Source.. It describes a refactoring of that code
> specifically and also hopefully a pattern and some reusable code we can use
> in the other sources as well.  The end goal would be best-effort event-time
> synchronization across all Flink sources but we are going to start with the
> Kinesis Source first.
> 
> Please take a look and please provide thoughts and opinions about the best
> state sharing mechanism to use -- that section is left blank and we're
> especially looking for input there.
> 
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> 
> -Jamie
> 
> 
> On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann  wrote:
> 
>> But on the Kafka source level it should be perfectly fine to do what Elias
>> proposed. This is of course is not the perfect solution but could bring us
>> forward quite a bit. The changes required for this should also be minimal.
>> This would become obsolete once we have something like shared state. But
>> until then, I think it would worth a try.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek 
>> wrote:
>> 
>>> The reason this selective reading doesn't work well in Flink in the
>> moment
>>> is because of checkpointing. For checkpointing, checkpoint barriers
>> travel
>>> within the streams. If we selectively read from inputs based on
>> timestamps
>>> this is akin to blocking an input if that input is very far ahead in
>> event
>>> time, which can happen when you have a very fast source and a slow source
>>> (in event time), maybe because you're in a catchup phase. In those c

Re: Sharing state between subtasks

2018-10-18 Thread Aljoscha Krettek
Hi Zhijiang,

do you already have working code or a design doc for the second approach?

Best,
Aljoscha

> On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) 
>  wrote:
> 
> Just noticed this discussion from @Till Rohrmann's weekly community update 
> and I want to share some thoughts from our experiences.
> 
> We also encountered the source consuption skew issue before, and we are 
> focused on improving this by two possible ways.
> 
> 1. Control the read strategy by the downstream side. In detail, every input 
> channel in downstream task corresponds to the consumption of one upstream 
> source task, and we will tag each input channel with watermark to find the 
> lowest channel to read in high priority. In essence, we actually rely on the 
> mechanism of backpressure. If the channel with highest timestamp is not read 
> by downstream task for a while, it will block the corresponding source task 
> to read when the buffers are exhausted. It is no need to change the source 
> interface in this way, but there are two major concerns: first it will affect 
> the barier alignment resulting in checkpoint delayed or expired. Second it 
> can not confirm source consumption alignment very precisely, and it is just a 
> best effort way. So we gave up this way finally.
> 
> 2. Add the new component of SourceCoordinator to coordinate the source 
> consumption distributedly. For example we can start this componnet in the 
> JobManager like the current role of CheckpointCoordinator. Then every source 
> task would commnicate with JobManager via current RPC mechanism, maybe we can 
> rely on the heartbeat message to attach the consumption progress as the 
> payloads. The JobManagerwill accumulator or state all the reported progress 
> and then give responses for different source tasks. We can define a protocol 
> for indicating the fast soruce task to sleep for specific time for example. 
> To do so, the coordinator has the global informations to give the proper 
> decision for individuals, so it seems more precise. And it will not affect 
> the barrier alignment, because the sleeping fast source can release the lock 
> to emit barrier as normal. The only concern is the changes for source 
> interface and may affect all related source implementations.
> 
> Currently we prefer to the second way to implement and will refer to other 
> good points above. :)
> 
> Best,
> Zhijiang
> --------------
> 发件人:Jamie Grier 
> 发送时间:2018年10月17日(星期三) 03:28
> 收件人:dev 
> 主 题:Re: Sharing state between subtasks
> 
> Here's a doc I started describing some changes we would like to make
> starting with the Kinesis Source.. It describes a refactoring of that code
> specifically and also hopefully a pattern and some reusable code we can use
> in the other sources as well.  The end goal would be best-effort event-time
> synchronization across all Flink sources but we are going to start with the
> Kinesis Source first.
> 
> Please take a look and please provide thoughts and opinions about the best
> state sharing mechanism to use -- that section is left blank and we're
> especially looking for input there.
> 
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> 
> -Jamie
> 
> 
> On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann  wrote:
> 
>> But on the Kafka source level it should be perfectly fine to do what Elias
>> proposed. This is of course is not the perfect solution but could bring us
>> forward quite a bit. The changes required for this should also be minimal.
>> This would become obsolete once we have something like shared state. But
>> until then, I think it would worth a try.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek 
>> wrote:
>> 
>>> The reason this selective reading doesn't work well in Flink in the
>> moment
>>> is because of checkpointing. For checkpointing, checkpoint barriers
>> travel
>>> within the streams. If we selectively read from inputs based on
>> timestamps
>>> this is akin to blocking an input if that input is very far ahead in
>> event
>>> time, which can happen when you have a very fast source and a slow source
>>> (in event time), maybe because you're in a catchup phase. In those cases
>>> it's better to simply not read the data at the sources, as Thomas said.
>>> This is also because with Kafka Streams, each operator is basically its
>> own
>>> job: it's reading from Kafka and writing to Kafka and there is not a
>>> complex graph of different operations with network shuffles in between,
>> as

回复:Sharing state between subtasks

2018-10-18 Thread Zhijiang(wangzhijiang999)
Just noticed this discussion from @Till Rohrmann's weekly community update and 
I want to share some thoughts from our experiences.

We also encountered the source consuption skew issue before, and we are focused 
on improving this by two possible ways.

1. Control the read strategy by the downstream side. In detail, every input 
channel in downstream task corresponds to the consumption of one upstream 
source task, and we will tag each input channel with watermark to find the 
lowest channel to read in high priority. In essence, we actually rely on the 
mechanism of backpressure. If the channel with highest timestamp is not read by 
downstream task for a while, it will block the corresponding source task to 
read when the buffers are exhausted. It is no need to change the source 
interface in this way, but there are two major concerns: first it will affect 
the barier alignment resulting in checkpoint delayed or expired. Second it can 
not confirm source consumption alignment very precisely, and it is just a best 
effort way. So we gave up this way finally.

2. Add the new component of SourceCoordinator to coordinate the source 
consumption distributedly. For example we can start this componnet in the 
JobManager like the current role of CheckpointCoordinator. Then every source 
task would commnicate with JobManager via current RPC mechanism, maybe we can 
rely on the heartbeat message to attach the consumption progress as the 
payloads. The JobManagerwill accumulator or state all the reported progress and 
then give responses for different source tasks. We can define a protocol for 
indicating the fast soruce task to sleep for specific time for example. To do 
so, the coordinator has the global informations to give the proper decision for 
individuals, so it seems more precise. And it will not affect the barrier 
alignment, because the sleeping fast source can release the lock to emit 
barrier as normal. The only concern is the changes for source interface and may 
affect all related source implementations.

Currently we prefer to the second way to implement and will refer to other good 
points above. :)

Best,
Zhijiang
--
发件人:Jamie Grier 
发送时间:2018年10月17日(星期三) 03:28
收件人:dev 
主 题:Re: Sharing state between subtasks

Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann  wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek 
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy 
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske 
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> >

Re: Sharing state between subtasks

2018-10-16 Thread Jamie Grier
Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann  wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek 
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy 
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske 
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> > >> I don't think this would be possible without some kind of shared
> state.
> > >>
> > >> The problem of tasks that are far ahead in time cannot be solved with
> > >> back-pressure.
> > >> That's because a task cannot choose from which source task it accepts
> > >> events and from which doesn't.
> > >> If it blocks an input, all downstream tasks that are connected to the
> > >> operator are affected. This can easily lead to deadlocks.
> > >> Therefore, all operators need to be able to handle events when they
> > arrive.
> > >> If they cannot process them yet because they are too far ahead in
> time,
> > >> they are put in state.
> > >>
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> >
> >
>


Re: Sharing state between subtasks

2018-10-11 Thread Till Rohrmann
But on the Kafka source level it should be perfectly fine to do what Elias
proposed. This is of course is not the perfect solution but could bring us
forward quite a bit. The changes required for this should also be minimal.
This would become obsolete once we have something like shared state. But
until then, I think it would worth a try.

Cheers,
Till

On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek 
wrote:

> The reason this selective reading doesn't work well in Flink in the moment
> is because of checkpointing. For checkpointing, checkpoint barriers travel
> within the streams. If we selectively read from inputs based on timestamps
> this is akin to blocking an input if that input is very far ahead in event
> time, which can happen when you have a very fast source and a slow source
> (in event time), maybe because you're in a catchup phase. In those cases
> it's better to simply not read the data at the sources, as Thomas said.
> This is also because with Kafka Streams, each operator is basically its own
> job: it's reading from Kafka and writing to Kafka and there is not a
> complex graph of different operations with network shuffles in between, as
> you have with Flink.
>
> This different nature of Flink is also why I think that readers need
> awareness of other readers to do the event-time alignment, and this is
> where shared state comes in.
>
> > On 10. Oct 2018, at 20:47, Elias Levy 
> wrote:
> >
> > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske  wrote:
> >
> >> I think the new source interface would be designed to be able to
> leverage
> >> shared state to achieve time alignment.
> >> I don't think this would be possible without some kind of shared state.
> >>
> >> The problem of tasks that are far ahead in time cannot be solved with
> >> back-pressure.
> >> That's because a task cannot choose from which source task it accepts
> >> events and from which doesn't.
> >> If it blocks an input, all downstream tasks that are connected to the
> >> operator are affected. This can easily lead to deadlocks.
> >> Therefore, all operators need to be able to handle events when they
> arrive.
> >> If they cannot process them yet because they are too far ahead in time,
> >> they are put in state.
> >>
> >
> > The idea I was suggesting is not for operators to block an input.
> Rather,
> > it is that they selectively choose from which input to process the next
> > message from based on their timestamp, so long as there are buffered
> > messages waiting to be processed.  That is a best-effort alignment
> > strategy.  Seems to work relatively well in practice, at least within
> Kafka
> > Streams.
> >
> > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> > its inputs.  Instead, it could keep them separate and selectively consume
> > from the one that had a buffer available, and if both have buffers
> > available, from the buffer with the messages with a lower timestamp.
>
>


Re: Sharing state between subtasks

2018-10-11 Thread Aljoscha Krettek
The reason this selective reading doesn't work well in Flink in the moment is 
because of checkpointing. For checkpointing, checkpoint barriers travel within 
the streams. If we selectively read from inputs based on timestamps this is 
akin to blocking an input if that input is very far ahead in event time, which 
can happen when you have a very fast source and a slow source (in event time), 
maybe because you're in a catchup phase. In those cases it's better to simply 
not read the data at the sources, as Thomas said. This is also because with 
Kafka Streams, each operator is basically its own job: it's reading from Kafka 
and writing to Kafka and there is not a complex graph of different operations 
with network shuffles in between, as you have with Flink.

This different nature of Flink is also why I think that readers need awareness 
of other readers to do the event-time alignment, and this is where shared state 
comes in.

> On 10. Oct 2018, at 20:47, Elias Levy  wrote:
> 
> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske  wrote:
> 
>> I think the new source interface would be designed to be able to leverage
>> shared state to achieve time alignment.
>> I don't think this would be possible without some kind of shared state.
>> 
>> The problem of tasks that are far ahead in time cannot be solved with
>> back-pressure.
>> That's because a task cannot choose from which source task it accepts
>> events and from which doesn't.
>> If it blocks an input, all downstream tasks that are connected to the
>> operator are affected. This can easily lead to deadlocks.
>> Therefore, all operators need to be able to handle events when they arrive.
>> If they cannot process them yet because they are too far ahead in time,
>> they are put in state.
>> 
> 
> The idea I was suggesting is not for operators to block an input.  Rather,
> it is that they selectively choose from which input to process the next
> message from based on their timestamp, so long as there are buffered
> messages waiting to be processed.  That is a best-effort alignment
> strategy.  Seems to work relatively well in practice, at least within Kafka
> Streams.
> 
> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> its inputs.  Instead, it could keep them separate and selectively consume
> from the one that had a buffer available, and if both have buffers
> available, from the buffer with the messages with a lower timestamp.



Re: Sharing state between subtasks

2018-10-10 Thread Elias Levy
On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske  wrote:

> I think the new source interface would be designed to be able to leverage
> shared state to achieve time alignment.
> I don't think this would be possible without some kind of shared state.
>
> The problem of tasks that are far ahead in time cannot be solved with
> back-pressure.
> That's because a task cannot choose from which source task it accepts
> events and from which doesn't.
> If it blocks an input, all downstream tasks that are connected to the
> operator are affected. This can easily lead to deadlocks.
> Therefore, all operators need to be able to handle events when they arrive.
> If they cannot process them yet because they are too far ahead in time,
> they are put in state.
>

The idea I was suggesting is not for operators to block an input.  Rather,
it is that they selectively choose from which input to process the next
message from based on their timestamp, so long as there are buffered
messages waiting to be processed.  That is a best-effort alignment
strategy.  Seems to work relatively well in practice, at least within Kafka
Streams.

E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
its inputs.  Instead, it could keep them separate and selectively consume
from the one that had a buffer available, and if both have buffers
available, from the buffer with the messages with a lower timestamp.


Re: Sharing state between subtasks

2018-10-10 Thread Thomas Weise
Thanks for the feedback and comments so far.

I want to elaborate more on the need for the shared state and awareness of
watermark alignment in the source implementation. Sources like Kafka and
Kinesis pull from the external system and then emit the records. For
Kinesis, we have multiple consumer threads (one per shard), that fetch from
Kinesis and push the records downstream. Each of those threads logically
have a different watermark. (Note that the Kinesis source in Flink
currently does not have any source watermarking support, we have
implemented that in our own extension at Lyft).

The source needs logic to decide which threads should pause because they
are ahead. That logic needs to take into account other threads and other
subtasks (for which we need the shared state). A watermark aware back
pressure mechanism would be great, but we cannot block the entire source.
We need to only stop reading those shards (or Kafka partitions) that have
gotten too far ahead of others.

Thanks,
Thomas


On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske  wrote:

> I think the new source interface would be designed to be able to leverage
> shared state to achieve time alignment.
> I don't think this would be possible without some kind of shared state.
>
> The problem of tasks that are far ahead in time cannot be solved with
> back-pressure.
> That's because a task cannot choose from which source task it accepts
> events and from which doesn't.
> If it blocks an input, all downstream tasks that are connected to the
> operator are affected. This can easily lead to deadlocks.
> Therefore, all operators need to be able to handle events when they arrive.
> If they cannot process them yet because they are too far ahead in time,
> they are put in state.
>
>
>
> Am Mi., 10. Okt. 2018 um 18:15 Uhr schrieb Elias Levy <
> fearsome.lucid...@gmail.com>:
>
> > On Wed, Oct 10, 2018 at 8:15 AM Aljoscha Krettek 
> > wrote:
> >
> > > I think the two things (shared state and new source interface) are
> > > somewhat orthogonal. The new source interface itself alone doesn't
> solve
> > > the problem, we would still need some mechanism for sharing the
> > event-time
> > > information between different subtasks. This could be the state sharing
> > > mechanism. Therefore I would say we should not block one on the other
> and
> > > therefore should go ahead with state sharing.
> > >
> >
> > Is that really the case?  The reason Thomas gave for the request to share
> > state among subtasks was to implement stream alignment.  If streams can
> be
> > aligned, then the given reason for state sharing disappears.  Not to say
> > there aren't other situations where state sharing could be useful.  It
> > would have been handy in a number of our jobs.
> >
> > Also, it's not clear to me that if sources (and multiple streams
> operators)
> > were performing time alignment, you'd need some mechanism for sharing
> > even-time information between subtasks.  Each source and multiple input
> > operator can perform its own local alignment and back-pressure can take
> > care of squelching sources that are advancing too fast.
> >
>


Re: Sharing state between subtasks

2018-10-10 Thread Fabian Hueske
I think the new source interface would be designed to be able to leverage
shared state to achieve time alignment.
I don't think this would be possible without some kind of shared state.

The problem of tasks that are far ahead in time cannot be solved with
back-pressure.
That's because a task cannot choose from which source task it accepts
events and from which doesn't.
If it blocks an input, all downstream tasks that are connected to the
operator are affected. This can easily lead to deadlocks.
Therefore, all operators need to be able to handle events when they arrive.
If they cannot process them yet because they are too far ahead in time,
they are put in state.



Am Mi., 10. Okt. 2018 um 18:15 Uhr schrieb Elias Levy <
fearsome.lucid...@gmail.com>:

> On Wed, Oct 10, 2018 at 8:15 AM Aljoscha Krettek 
> wrote:
>
> > I think the two things (shared state and new source interface) are
> > somewhat orthogonal. The new source interface itself alone doesn't solve
> > the problem, we would still need some mechanism for sharing the
> event-time
> > information between different subtasks. This could be the state sharing
> > mechanism. Therefore I would say we should not block one on the other and
> > therefore should go ahead with state sharing.
> >
>
> Is that really the case?  The reason Thomas gave for the request to share
> state among subtasks was to implement stream alignment.  If streams can be
> aligned, then the given reason for state sharing disappears.  Not to say
> there aren't other situations where state sharing could be useful.  It
> would have been handy in a number of our jobs.
>
> Also, it's not clear to me that if sources (and multiple streams operators)
> were performing time alignment, you'd need some mechanism for sharing
> even-time information between subtasks.  Each source and multiple input
> operator can perform its own local alignment and back-pressure can take
> care of squelching sources that are advancing too fast.
>


Re: Sharing state between subtasks

2018-10-10 Thread Elias Levy
On Wed, Oct 10, 2018 at 8:15 AM Aljoscha Krettek 
wrote:

> I think the two things (shared state and new source interface) are
> somewhat orthogonal. The new source interface itself alone doesn't solve
> the problem, we would still need some mechanism for sharing the event-time
> information between different subtasks. This could be the state sharing
> mechanism. Therefore I would say we should not block one on the other and
> therefore should go ahead with state sharing.
>

Is that really the case?  The reason Thomas gave for the request to share
state among subtasks was to implement stream alignment.  If streams can be
aligned, then the given reason for state sharing disappears.  Not to say
there aren't other situations where state sharing could be useful.  It
would have been handy in a number of our jobs.

Also, it's not clear to me that if sources (and multiple streams operators)
were performing time alignment, you'd need some mechanism for sharing
even-time information between subtasks.  Each source and multiple input
operator can perform its own local alignment and back-pressure can take
care of squelching sources that are advancing too fast.


Re: Sharing state between subtasks

2018-10-10 Thread Aljoscha Krettek
Sorry for also derailing this a bit earlier...

I think the two things (shared state and new source interface) are somewhat 
orthogonal. The new source interface itself alone doesn't solve the problem, we 
would still need some mechanism for sharing the event-time information between 
different subtasks. This could be the state sharing mechanism. Therefore I 
would say we should not block one on the other and therefore should go ahead 
with state sharing.

With recent releases we started to abstract Akka away behind RPC interfaces, so 
we probably shouldn't introduce a hard dependency on Akka (or another system) 
again. Maybe Till (cc'ed) could shed some light on this. It might be that we 
just have to design a generic interface and then use Akka underneath.


> On 10. Oct 2018, at 16:18, Jamie Grier  wrote:
> 
> Also, I'm afraid I derailed this thread just a bit..  So also back to
> Thomas's original question..
> 
> If we decide state-sharing across source subtasks is the way forward for
> now -- does anybody have thoughts to share on what form this should take?
> 
> Thomas mentioned Akka or JGroups.  Other thoughts?
> 
> 
> On Wed, Oct 10, 2018 at 6:58 AM Jamie Grier  wrote:
> 
>> Okay, so I think there is a lot of agreement here about (a) This is a real
>> issue for people, and (b) an ideal long-term approach to solving it.
>> 
>> As Aljoscha and Elias said a full solution to this would be to also
>> redesign the source interface such that individual partitions are exposed
>> in the API and not hidden inside sources like now -- then we could be much
>> smarter about the way we read from the individual partitions.  We would
>> also have to modify the stream task code such that it also reads in a
>> time-aligned way throughout the data flow to solve the full problem --
>> either that or use some shared state between sources to keep them
>> time-aligned across sub-tasks just at the source.
>> 
>> With regard to this question of state sharing between source sub-tasks
>> versus modifying Flink to do time-aligned reads throughout the system --
>> does anybody have a strong opinion on this?
>> 
>> We're basically looking for a way forward and our initial approach, though
>> ugly because it requires modification to all of the sources we use, was
>> going to be to share state between source sub-tasks in order to keep them
>> time-aligned with no further modifications required to Flink's core.
>> 
>> However, if it seems reasonable to do and there is consensus on the best
>> way forward maybe we should be looking at introducing the time-alignment
>> properly instead of hacking the sources.
>> 
>> 
>> 
>> 
>> On Tue, Oct 9, 2018 at 12:01 PM Elias Levy 
>> wrote:
>> 
>>> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek 
>>> wrote:
>>> 
 @Elias Do you know if Kafka Consumers do this alignment across multiple
 consumers or only within one Consumer across the partitions that it
>>> reads
 from.
 
>>> 
>>> The behavior is part of Kafka Streams
>>> <
>>> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
 ,
>>> not the Kafka consumer.  The alignment does not occur across Kafka
>>> consumers, but that is because Kafka Streams, unlikely Flink, uses a
>>> single
>>> consumer to fetch records from multiple sources / topics.  The alignment
>>> occurs with the stream task.  Stream tasks keep queues per topic-partition
>>> (which may be from different topics), and select the next record to
>>> processed by selecting the queue with the lowest timestamp.
>>> 
>>> The equivalent in Flink would be for the Kafka connector source to select
>>> the message among partitions with the lowest timestamp to emit next, and
>>> for multiple input stream operators to select the message among inputs
>>> with
>>> the lowest timestamp to process.
>>> 
>> 



Re: Sharing state between subtasks

2018-10-10 Thread Jamie Grier
Also, I'm afraid I derailed this thread just a bit..  So also back to
Thomas's original question..

If we decide state-sharing across source subtasks is the way forward for
now -- does anybody have thoughts to share on what form this should take?

Thomas mentioned Akka or JGroups.  Other thoughts?


On Wed, Oct 10, 2018 at 6:58 AM Jamie Grier  wrote:

> Okay, so I think there is a lot of agreement here about (a) This is a real
> issue for people, and (b) an ideal long-term approach to solving it.
>
> As Aljoscha and Elias said a full solution to this would be to also
> redesign the source interface such that individual partitions are exposed
> in the API and not hidden inside sources like now -- then we could be much
> smarter about the way we read from the individual partitions.  We would
> also have to modify the stream task code such that it also reads in a
> time-aligned way throughout the data flow to solve the full problem --
> either that or use some shared state between sources to keep them
> time-aligned across sub-tasks just at the source.
>
> With regard to this question of state sharing between source sub-tasks
> versus modifying Flink to do time-aligned reads throughout the system --
> does anybody have a strong opinion on this?
>
> We're basically looking for a way forward and our initial approach, though
> ugly because it requires modification to all of the sources we use, was
> going to be to share state between source sub-tasks in order to keep them
> time-aligned with no further modifications required to Flink's core.
>
> However, if it seems reasonable to do and there is consensus on the best
> way forward maybe we should be looking at introducing the time-alignment
> properly instead of hacking the sources.
>
>
>
>
> On Tue, Oct 9, 2018 at 12:01 PM Elias Levy 
> wrote:
>
>> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek 
>> wrote:
>>
>> > @Elias Do you know if Kafka Consumers do this alignment across multiple
>> > consumers or only within one Consumer across the partitions that it
>> reads
>> > from.
>> >
>>
>> The behavior is part of Kafka Streams
>> <
>> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
>> >,
>> not the Kafka consumer.  The alignment does not occur across Kafka
>> consumers, but that is because Kafka Streams, unlikely Flink, uses a
>> single
>> consumer to fetch records from multiple sources / topics.  The alignment
>> occurs with the stream task.  Stream tasks keep queues per topic-partition
>> (which may be from different topics), and select the next record to
>> processed by selecting the queue with the lowest timestamp.
>>
>> The equivalent in Flink would be for the Kafka connector source to select
>> the message among partitions with the lowest timestamp to emit next, and
>> for multiple input stream operators to select the message among inputs
>> with
>> the lowest timestamp to process.
>>
>


Re: Sharing state between subtasks

2018-10-10 Thread Jamie Grier
Okay, so I think there is a lot of agreement here about (a) This is a real
issue for people, and (b) an ideal long-term approach to solving it.

As Aljoscha and Elias said a full solution to this would be to also
redesign the source interface such that individual partitions are exposed
in the API and not hidden inside sources like now -- then we could be much
smarter about the way we read from the individual partitions.  We would
also have to modify the stream task code such that it also reads in a
time-aligned way throughout the data flow to solve the full problem --
either that or use some shared state between sources to keep them
time-aligned across sub-tasks just at the source.

With regard to this question of state sharing between source sub-tasks
versus modifying Flink to do time-aligned reads throughout the system --
does anybody have a strong opinion on this?

We're basically looking for a way forward and our initial approach, though
ugly because it requires modification to all of the sources we use, was
going to be to share state between source sub-tasks in order to keep them
time-aligned with no further modifications required to Flink's core.

However, if it seems reasonable to do and there is consensus on the best
way forward maybe we should be looking at introducing the time-alignment
properly instead of hacking the sources.




On Tue, Oct 9, 2018 at 12:01 PM Elias Levy 
wrote:

> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek 
> wrote:
>
> > @Elias Do you know if Kafka Consumers do this alignment across multiple
> > consumers or only within one Consumer across the partitions that it reads
> > from.
> >
>
> The behavior is part of Kafka Streams
> <
> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
> >,
> not the Kafka consumer.  The alignment does not occur across Kafka
> consumers, but that is because Kafka Streams, unlikely Flink, uses a single
> consumer to fetch records from multiple sources / topics.  The alignment
> occurs with the stream task.  Stream tasks keep queues per topic-partition
> (which may be from different topics), and select the next record to
> processed by selecting the queue with the lowest timestamp.
>
> The equivalent in Flink would be for the Kafka connector source to select
> the message among partitions with the lowest timestamp to emit next, and
> for multiple input stream operators to select the message among inputs with
> the lowest timestamp to process.
>


Re: Sharing state between subtasks

2018-10-09 Thread Elias Levy
On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek  wrote:

> @Elias Do you know if Kafka Consumers do this alignment across multiple
> consumers or only within one Consumer across the partitions that it reads
> from.
>

The behavior is part of Kafka Streams
,
not the Kafka consumer.  The alignment does not occur across Kafka
consumers, but that is because Kafka Streams, unlikely Flink, uses a single
consumer to fetch records from multiple sources / topics.  The alignment
occurs with the stream task.  Stream tasks keep queues per topic-partition
(which may be from different topics), and select the next record to
processed by selecting the queue with the lowest timestamp.

The equivalent in Flink would be for the Kafka connector source to select
the message among partitions with the lowest timestamp to emit next, and
for multiple input stream operators to select the message among inputs with
the lowest timestamp to process.


Re: Sharing state between subtasks

2018-10-09 Thread Fabian Hueske
Hi,

I think watermark / event-time skew is a problem that many users are
struggling with.
A built-in primitive to align event-time would be a great feature!

However, there are also some cases when it would be useful for different
streams to have diverging event-time, such as an interval join [1]
(DataStream API) or time-windowed join (SQL) that joins one stream will
events from another stream that happened 2 to 1 hour ago.
Granted, this is a very specific case and not the norm, but it might make
sense to have it in the back of our heads when designing this feature.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join

Am Di., 9. Okt. 2018 um 10:25 Uhr schrieb Aljoscha Krettek <
aljos...@apache.org>:

> Yes, I think this is the way to go.
>
> This would also go well with a redesign of the source interface that has
> been floated for a while now. I also created a prototype a while back:
> https://github.com/aljoscha/flink/tree/refactor-source-interface <
> https://github.com/aljoscha/flink/tree/refactor-source-interface>. Just
> as a refresher, the redesign aims at several things:
>
> 1. Make partitions/splits explicit in the interface. Currently, the fact
> that there are file splits or Kafka partitions or Kinesis shards is hidden
> in the source implementation while it would be beneficial for the system to
> know of these and to be able to track watermarks for them. Currently, there
> is a custom implementation for per-partition watermark tracking in the
> Kafka Consumer that this redesign would obviate.
>
> 2. Split split/partition/shard discovery from the reading part. This would
> allow rebalancing work and again makes the nature of sources more explicit
> in the interfaces.
>
> 3. Go away from the push model to a pull model. The problem with the
> current source interface is that the source controls the read-loop and has
> to get the checkpoint lock for emitting elements/updating state. If we get
> the loop out of the source this leaves more potential for Flink to be
> clever about reading from sources.
>
> The prototype posted above defines three new interfaces: Source,
> SplitEnumerator, and SplitReader, along with a naive example and a working
> Kafka Consumer (with checkpointing, actually).
>
> If we had this source interface, along with a service for propagating
> watermark information the code that reads form the splits could
> de-prioritise certain splits and we would get the event-time alignment
> behaviour for all sources that are implemented using the new interface
> without requiring special code in each source implementation.
>
> @Elias Do you know if Kafka Consumers do this alignment across multiple
> consumers or only within one Consumer across the partitions that it reads
> from.
>
> > On 9. Oct 2018, at 00:55, Elias Levy 
> wrote:
> >
> > Kafka Streams handles this problem, time alignment, by processing records
> > from the partitions with the lowest timestamp in a best effort basis.
> > See KIP-353 for the details.  The same could be done within the Kafka
> > source and multiple input stream operators.  I opened FLINK-4558
> >  a while ago regarding
> > this topic.
> >
> > On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier 
> wrote:
> >
> >> I'd be very curious to hear others' thoughts on this..  I would expect
> many
> >> people to have run into similar issues.  I also wonder if anybody has
> >> already been working on similar issues.  It seems there is room for some
> >> core Flink changes to address this as well and I'm guessing people have
> >> already thought about it.
> >>
>
>


Re: Sharing state between subtasks

2018-10-09 Thread Aljoscha Krettek
Yes, I think this is the way to go.

This would also go well with a redesign of the source interface that has been 
floated for a while now. I also created a prototype a while back: 
https://github.com/aljoscha/flink/tree/refactor-source-interface 
. Just as a 
refresher, the redesign aims at several things:

1. Make partitions/splits explicit in the interface. Currently, the fact that 
there are file splits or Kafka partitions or Kinesis shards is hidden in the 
source implementation while it would be beneficial for the system to know of 
these and to be able to track watermarks for them. Currently, there is a custom 
implementation for per-partition watermark tracking in the Kafka Consumer that 
this redesign would obviate.

2. Split split/partition/shard discovery from the reading part. This would 
allow rebalancing work and again makes the nature of sources more explicit in 
the interfaces.

3. Go away from the push model to a pull model. The problem with the current 
source interface is that the source controls the read-loop and has to get the 
checkpoint lock for emitting elements/updating state. If we get the loop out of 
the source this leaves more potential for Flink to be clever about reading from 
sources.

The prototype posted above defines three new interfaces: Source, 
SplitEnumerator, and SplitReader, along with a naive example and a working 
Kafka Consumer (with checkpointing, actually).

If we had this source interface, along with a service for propagating watermark 
information the code that reads form the splits could de-prioritise certain 
splits and we would get the event-time alignment behaviour for all sources that 
are implemented using the new interface without requiring special code in each 
source implementation.

@Elias Do you know if Kafka Consumers do this alignment across multiple 
consumers or only within one Consumer across the partitions that it reads from.

> On 9. Oct 2018, at 00:55, Elias Levy  wrote:
> 
> Kafka Streams handles this problem, time alignment, by processing records
> from the partitions with the lowest timestamp in a best effort basis.
> See KIP-353 for the details.  The same could be done within the Kafka
> source and multiple input stream operators.  I opened FLINK-4558
>  a while ago regarding
> this topic.
> 
> On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier  wrote:
> 
>> I'd be very curious to hear others' thoughts on this..  I would expect many
>> people to have run into similar issues.  I also wonder if anybody has
>> already been working on similar issues.  It seems there is room for some
>> core Flink changes to address this as well and I'm guessing people have
>> already thought about it.
>> 



Re: Sharing state between subtasks

2018-10-08 Thread Elias Levy
Kafka Streams handles this problem, time alignment, by processing records
from the partitions with the lowest timestamp in a best effort basis.
See KIP-353 for the details.  The same could be done within the Kafka
source and multiple input stream operators.  I opened FLINK-4558
 a while ago regarding
this topic.

On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier  wrote:

> I'd be very curious to hear others' thoughts on this..  I would expect many
> people to have run into similar issues.  I also wonder if anybody has
> already been working on similar issues.  It seems there is room for some
> core Flink changes to address this as well and I'm guessing people have
> already thought about it.
>


Re: Sharing state between subtasks

2018-10-08 Thread Jamie Grier
I'll add to what Thomas already said..  The larger issue driving this is
that when reading from a source with many parallel partitions, especially
when reading lots of historical data (or recovering from downtime and there
is a backlog to read), it's quite common for there to develop an event-time
skew across those partitions.

When doing event-time windowing -- or in fact any event-time driven
processing -- the event time skew across partitions results directly in
increased buffering in Flink and of course the corresponding
state/checkpoint size growth.

As the event-time skew and state size grows larger this can have a major
effect on application performance and in some cases result in a "death
spiral" where the application performance get's worse and worse as the
state size grows and grows.

So, one solution to this problem, outside of core changes in Flink itself,
seems to be to try to coordinate sources across partitions so that they
make progress through event time at roughly the same rate.  In fact if
there is large skew the idea would be to slow or even stop reading from
some partitions with newer data while first reading the partitions with
older data.  Anyway, to do this we need to share state somehow amongst
sub-tasks.

The common sense view of this is the following:  Why would we want to pull
data from a perfectly good buffer (like a filesystem, Kinesis, or Kafka)
into Flink state just to manage and checkpoint it while waiting to be able
to complete event time computations.  The completion of computations is
held up by the partitions with the oldest data so it's of no value to read
the newer data until you've read the old.  It seems much better to leave
the newer data buffered upstream.

I'd be very curious to hear others' thoughts on this..  I would expect many
people to have run into similar issues.  I also wonder if anybody has
already been working on similar issues.  It seems there is room for some
core Flink changes to address this as well and I'm guessing people have
already thought about it.

-Jamie



On Sun, Oct 7, 2018 at 12:58 PM Thomas Weise  wrote:

> I'm looking to implement a state sharing mechanism between subtasks (of one
> or multiple tasks). Our use case is to align watermarks between subtasks of
> one or multiple sources to prevent some data fetchers to race ahead of
> others and cause massive state buffering in Flink.
>
> Each subtask would share a small state (probably just a key and couple
> longs). The state would be updated periodically (perhaps every 30s). Other
> subtasks should see these changes with similar latency. It is essentially a
> hash table to which every node contributes a distinct key.
>
> An initial idea was to implement this using ZooKeeper ephemeral nodes. But
> since there is no way to read all child nodes in one sweep, state access
> becomes very chatty. With lets's say 512 subtasks we would end up with 512
> * 512 reads per interval (1 to list children, N-1 to fetch data, per
> subtask).
>
> My next stop will be a group communication mechanism like JGroups or Akka
> (following looks like a potential good fit:
> https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java).
> But
> before that I wanted to check if others already had a similar need and
> possibly experience/implementation to share?
>
> There are probably more use cases related to discovery etc. Perhaps Flink
> could provide a state primitive, if there is broader interest in the
> community?
>
> Thanks,
> Thomas
>


Sharing state between subtasks

2018-10-07 Thread Thomas Weise
I'm looking to implement a state sharing mechanism between subtasks (of one
or multiple tasks). Our use case is to align watermarks between subtasks of
one or multiple sources to prevent some data fetchers to race ahead of
others and cause massive state buffering in Flink.

Each subtask would share a small state (probably just a key and couple
longs). The state would be updated periodically (perhaps every 30s). Other
subtasks should see these changes with similar latency. It is essentially a
hash table to which every node contributes a distinct key.

An initial idea was to implement this using ZooKeeper ephemeral nodes. But
since there is no way to read all child nodes in one sweep, state access
becomes very chatty. With lets's say 512 subtasks we would end up with 512
* 512 reads per interval (1 to list children, N-1 to fetch data, per
subtask).

My next stop will be a group communication mechanism like JGroups or Akka
(following looks like a potential good fit:
https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java). But
before that I wanted to check if others already had a similar need and
possibly experience/implementation to share?

There are probably more use cases related to discovery etc. Perhaps Flink
could provide a state primitive, if there is broader interest in the
community?

Thanks,
Thomas