Hey all, I think all we need for this on the state sharing side is pretty simple. I opened a JIRA to track this work and submitted a PR for the state sharing bit.
https://issues.apache.org/jira/browse/FLINK-10886 https://github.com/apache/flink/pull/7099 Please provide feedback :) -Jamie On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Thomas, > > using Akka directly would further manifest our dependency on Scala in > flink-runtime. This is something we are currently trying to get rid of. For > that purpose we have added the RpcService abstraction which encapsulates > all Akka specific logic. We hope that we can soon get rid of the Scala > dependency in flink-runtime by using a special class loader only for > loading the AkkaRpcService implementation. > > I think the easiest way to sync the task information is actually going > through the JobMaster because the subtasks don't know on which other TMs > the other subtasks run. Otherwise, we would need to have some TM detection > mechanism between TMs. If you choose this way, then you should be able to > use the RpcService by extending the JobMasterGateway by additional RPCs. > > Cheers, > Till > > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <t...@apache.org> wrote: > > > Hi, > > > > We are planning to work on the Kinesis consumer in the following order: > > > > 1. Add per shard watermarking: > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we > > already use internally; I will open a PR to add it to the Flink Kinesis > > consumer > > 2. Exchange of per subtask watermarks between all subtasks of one or > > multiple sources > > 3. Implement queue approach described in Jamie's document in to utilize > 1.) > > and 2.) to align the shard consumers WRT event time > > > > There was some discussion regarding the mechanism to share the watermarks > > between subtasks. If there is something that can be re-used it would be > > great. Otherwise I'm going to further investigate the Akka or JGroups > > route. Regarding Akka, since it is used within Flink already, is there an > > abstraction that you would recommend to consider to avoid direct > > dependency? > > > > Thanks, > > Thomas > > > > > > > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999) > > <wangzhijiang...@aliyun.com.invalid> wrote: > > > > > Not yet. We only have some initial thoughts and have not worked on it > > yet. > > > We will update the progress in this discussion if have. > > > > > > Best, > > > Zhijiang > > > ------------------------------------------------------------------ > > > 发件人:Aljoscha Krettek <aljos...@apache.org> > > > 发送时间:2018年10月18日(星期四) 17:53 > > > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) < > > > wangzhijiang...@aliyun.com> > > > 抄 送:Till Rohrmann <trohrm...@apache.org> > > > 主 题:Re: Sharing state between subtasks > > > > > > Hi Zhijiang, > > > > > > do you already have working code or a design doc for the second > approach? > > > > > > Best, > > > Aljoscha > > > > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) < > > > wangzhijiang...@aliyun.com.INVALID> wrote: > > > > > > > > Just noticed this discussion from @Till Rohrmann's weekly community > > > update and I want to share some thoughts from our experiences. > > > > > > > > We also encountered the source consuption skew issue before, and we > are > > > focused on improving this by two possible ways. > > > > > > > > 1. Control the read strategy by the downstream side. In detail, every > > > input channel in downstream task corresponds to the consumption of one > > > upstream source task, and we will tag each input channel with watermark > > to > > > find the lowest channel to read in high priority. In essence, we > actually > > > rely on the mechanism of backpressure. If the channel with highest > > > timestamp is not read by downstream task for a while, it will block the > > > corresponding source task to read when the buffers are exhausted. It is > > no > > > need to change the source interface in this way, but there are two > major > > > concerns: first it will affect the barier alignment resulting in > > checkpoint > > > delayed or expired. Second it can not confirm source consumption > > alignment > > > very precisely, and it is just a best effort way. So we gave up this > way > > > finally. > > > > > > > > 2. Add the new component of SourceCoordinator to coordinate the > source > > > consumption distributedly. For example we can start this componnet in > the > > > JobManager like the current role of CheckpointCoordinator. Then every > > > source task would commnicate with JobManager via current RPC mechanism, > > > maybe we can rely on the heartbeat message to attach the consumption > > > progress as the payloads. The JobManagerwill accumulator or state all > the > > > reported progress and then give responses for different source tasks. > We > > > can define a protocol for indicating the fast soruce task to sleep for > > > specific time for example. To do so, the coordinator has the global > > > informations to give the proper decision for individuals, so it seems > > more > > > precise. And it will not affect the barrier alignment, because the > > sleeping > > > fast source can release the lock to emit barrier as normal. The only > > > concern is the changes for source interface and may affect all related > > > source implementations. > > > > > > > > Currently we prefer to the second way to implement and will refer to > > > other good points above. :) > > > > > > > > Best, > > > > Zhijiang > > > > ------------------------------------------------------------------ > > > > 发件人:Jamie Grier <jgr...@lyft.com.INVALID> > > > > 发送时间:2018年10月17日(星期三) 03:28 > > > > 收件人:dev <dev@flink.apache.org> > > > > 主 题:Re: Sharing state between subtasks > > > > > > > > Here's a doc I started describing some changes we would like to make > > > > starting with the Kinesis Source.. It describes a refactoring of that > > > code > > > > specifically and also hopefully a pattern and some reusable code we > can > > > use > > > > in the other sources as well. The end goal would be best-effort > > > event-time > > > > synchronization across all Flink sources but we are going to start > with > > > the > > > > Kinesis Source first. > > > > > > > > Please take a look and please provide thoughts and opinions about the > > > best > > > > state sharing mechanism to use -- that section is left blank and > we're > > > > especially looking for input there. > > > > > > > > > > > > > > https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing > > > > > > > > -Jamie > > > > > > > > > > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <trohrm...@apache.org > > > > > wrote: > > > > > > > >> But on the Kafka source level it should be perfectly fine to do what > > > Elias > > > >> proposed. This is of course is not the perfect solution but could > > bring > > > us > > > >> forward quite a bit. The changes required for this should also be > > > minimal. > > > >> This would become obsolete once we have something like shared state. > > But > > > >> until then, I think it would worth a try. > > > >> > > > >> Cheers, > > > >> Till > > > >> > > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek < > aljos...@apache.org > > > > > > >> wrote: > > > >> > > > >>> The reason this selective reading doesn't work well in Flink in the > > > >> moment > > > >>> is because of checkpointing. For checkpointing, checkpoint barriers > > > >> travel > > > >>> within the streams. If we selectively read from inputs based on > > > >> timestamps > > > >>> this is akin to blocking an input if that input is very far ahead > in > > > >> event > > > >>> time, which can happen when you have a very fast source and a slow > > > source > > > >>> (in event time), maybe because you're in a catchup phase. In those > > > cases > > > >>> it's better to simply not read the data at the sources, as Thomas > > said. > > > >>> This is also because with Kafka Streams, each operator is basically > > its > > > >> own > > > >>> job: it's reading from Kafka and writing to Kafka and there is not > a > > > >>> complex graph of different operations with network shuffles in > > between, > > > >> as > > > >>> you have with Flink. > > > >>> > > > >>> This different nature of Flink is also why I think that readers > need > > > >>> awareness of other readers to do the event-time alignment, and this > > is > > > >>> where shared state comes in. > > > >>> > > > >>>> On 10. Oct 2018, at 20:47, Elias Levy < > fearsome.lucid...@gmail.com> > > > >>> wrote: > > > >>>> > > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhue...@gmail.com> > > > >> wrote: > > > >>>> > > > >>>>> I think the new source interface would be designed to be able to > > > >>> leverage > > > >>>>> shared state to achieve time alignment. > > > >>>>> I don't think this would be possible without some kind of shared > > > >> state. > > > >>>>> > > > >>>>> The problem of tasks that are far ahead in time cannot be solved > > with > > > >>>>> back-pressure. > > > >>>>> That's because a task cannot choose from which source task it > > accepts > > > >>>>> events and from which doesn't. > > > >>>>> If it blocks an input, all downstream tasks that are connected to > > the > > > >>>>> operator are affected. This can easily lead to deadlocks. > > > >>>>> Therefore, all operators need to be able to handle events when > they > > > >>> arrive. > > > >>>>> If they cannot process them yet because they are too far ahead in > > > >> time, > > > >>>>> they are put in state. > > > >>>>> > > > >>>> > > > >>>> The idea I was suggesting is not for operators to block an input. > > > >>> Rather, > > > >>>> it is that they selectively choose from which input to process the > > > next > > > >>>> message from based on their timestamp, so long as there are > buffered > > > >>>> messages waiting to be processed. That is a best-effort alignment > > > >>>> strategy. Seems to work relatively well in practice, at least > > within > > > >>> Kafka > > > >>>> Streams. > > > >>>> > > > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate > for > > > >> both > > > >>>> its inputs. Instead, it could keep them separate and selectively > > > >> consume > > > >>>> from the one that had a buffer available, and if both have buffers > > > >>>> available, from the buffer with the messages with a lower > timestamp. > > > >>> > > > >>> > > > >> > > > > > > > > > > > > > > > >