Hi Piotr, Thanks for the explanation. Agreed that the broadcastEmit(record) is a better choice for broadcasting for the iterations. As broadcasting for the iterations is the first motivation, let's support it first.
Thanks, Zhu Zhu Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月23日周五 下午11:56写道: > Hi Piotr, > > Very thanks for the suggestions! > > Totally agree with that we could first focus on the broadcast > scenarios and exposing the broadcastEmit method first considering the > semantics and performance. > > For the keyed stream, I also agree with that broadcasting keyed > records to all the tasks may be confused considering the semantics of keyed > partitioner. However, in the iteration case supporting broadcast over keyed > partitioner should be required since users may create any subgraph for the > iteration body, including the operators with key. I think a possible > solution to this issue is to introduce another data type for > 'broadcastEmit'. For example, for an operator Operator<T>, it may broadcast > emit another type E instead of T, and the transmitting E will bypass the > partitioner and setting keyed context. This should result in the design to > introduce customized operator event (option 1 in the document). The cost of > this method is that we need to introduce a new type of StreamElement and > new interface for this type, but it should be suitable for both keyed or > non-keyed partitioner. > > Best, > Yun > > > > ------------------------------------------------------------------ > From:Piotr Nowojski <pi...@ververica.com> > Send Time:2019 Aug. 23 (Fri.) 22:29 > To:Zhu Zhu <reed...@gmail.com> > Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com> > Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern > > Hi, > > If the primary motivation is broadcasting (for the iterations) and we have > no immediate need for multicast (cross join), I would prefer to first > expose broadcast via the DataStream API and only later, once we finally > need it, support multicast. As I wrote, multicast would be more challenging > to implement, with more complicated runtime and API. And re-using multicast > just to support broadcast doesn’t have much sense: > > 1. It’s a bit obfuscated. It’s easier to understand > collectBroadcast(record) or broadcastEmit(record) compared to some > multicast channel selector that just happens to return all of the channels. > 2. There are performance benefits of explicitly calling > `RecordWriter#broadcastEmit`. > > > On a different note, what would be the semantic of such broadcast emit on > KeyedStream? Would it be supported? Or would we limit support only to the > non-keyed streams? > > Piotrek > > > On 23 Aug 2019, at 12:48, Zhu Zhu <reed...@gmail.com> wrote: > > > > Thanks Piotr, > > > > Users asked for this feature sometimes ago when they migrating batch > jobs to Flink(Blink). > > It's not very urgent as they have taken some workarounds to solve > it.(like partitioning data set to different job vertices) > > So it's fine to not make it top priority. > > > > Anyway, as a commonly known scenario, I think users can benefit from > cross join sooner or later. > > > > Thanks, > > Zhu Zhu > > > > Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> > 于2019年8月23日周五 下午6:19写道: > > Hi, > > > > Thanks for the answers :) Ok I understand the full picture now. +1 from > my side on solving this issue somehow. But before we start discussing how > to solve it one last control question: > > > > I guess this multicast is intended to be used in blink planner, right? > Assuming that we implement the multicast support now, when would it be used > by the blink? I would like to avoid a scenario, where we implement an > unused feature and we keep maintaining it for a long period of time. > > > > Piotrek > > > > PS, try to include motivating examples, including concrete ones in the > proposals/design docs, for example in the very first paragraph. Especially > if it’s a commonly known feature like cross join :) > > > > > On 23 Aug 2019, at 11:38, Yun Gao <yungao...@aliyun.com.INVALID> > wrote: > > > > > > Hi Piotr, > > > > > > Thanks a lot for sharing the thoughts! > > > > > > For the iteration, agree with that multicasting is not > necessary. Exploring the broadcast interface to Output of the operators in > some way should also solve this issue, and I think it should be even more > convenient to have the broadcast method for the iteration. > > > > > > Also thanks Zhu Zhu for the cross join case! > > > Best, > > > Yun > > > > > > > > > > > > ------------------------------------------------------------------ > > > From:Zhu Zhu <reed...@gmail.com <mailto:reed...@gmail.com>> > > > Send Time:2019 Aug. 23 (Fri.) 17:25 > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>> > > > Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> > > > Subject:Re: [DISCUSS] Enhance Support for Multicast Communication > Pattern > > > > > > Hi Piotr, > > > > > > Yes you are right it's a distributed cross join requirement. > > > Broadcast join can help with cross join cases. But users cannot use it > if the data set to join is too large to fit into one subtask. > > > > > > Sorry for left some details behind. > > > > > > Thanks, > > > Zhu Zhu > > > Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> > 于2019年8月23日周五 下午4:57写道: > > > Hi Yun and Zhu Zhu, > > > > > > Thanks for the more detailed example Zhu Zhu. > > > > > > As far as I understand for the iterations example we do not need > multicasting. Regarding the Join example, I don’t fully understand it. The > example that Zhu Zhu presented has a drawback of sending both tables to > multiple nodes. What’s the benefit of using broadcast join over a hash join > in such case? As far as I know, the biggest benefit of using broadcast join > instead of hash join is that we can avoid sending the larger table over the > network, because we can perform the join locally. In this example we are > sending both of the tables to multiple nodes, which should defeat the > purpose. > > > > > > Is it about implementing cross join or near cross joins in a > distributed fashion? > > > > > >> if we introduce a new MulticastRecordWriter > > > > > > That’s one of the solutions. It might have a drawback of 3 class > virtualisation problem (We have RecordWriter and BroadcastRecordWriter > already). With up to two implementations, JVM is able to devirtualise the > calls. > > > > > > Previously I was also thinking about just providing two different > ChannelSelector interfaces. One with `int[]` and `SingleChannelSelector` > with plain `int` and based on that, RecordWriter could perform some magic > (worst case scenario `instaceof` checks). > > > > > > Another solution might be to change `ChannelSelector` interface into > an iterator. > > > > > > But let's discuss the details after we agree on implementing this. > > > > > > Piotrek > > > > > >> On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com <mailto: > yungao...@aliyun.com>> wrote: > > >> > > >> Hi Piotr, > > >> > > >> Thanks a lot for the suggestions! > > >> > > >> The core motivation of this discussion is to implement a new > iteration library on the DataStream, and it requires to insert special > records in the stream to notify the progress of the iteration. The > mechanism of such records is very similar to the current Watermark, and we > meet the problem of sending normal records according to the partition > (Rebalance, etc..) and also be able to broadcast the inserted progress > records to all the connected records. I have read the notes in the google > doc and I totally agree with that exploring the broadcast interface in > RecordWriter in some way is able to solve this issue. > > >> > > >> Regarding to `int[] ChannelSelector#selectChannels()`, I'm > wondering if we introduce a new MulticastRecordWriter and left the current > RecordWriter untouched, could we avoid the performance degradation ? Since > with such a modification the normal RecordWriter does not need to iterate > the return array by ChannelSelector, and the only difference will be > returning an array instead of an integer, and accessing the first element > of the returned array instead of reading the integer directly. > > >> > > >> Best, > > >> Yun > > >> > > >> ------------------------------------------------------------------ > > >> From:Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com > >> > > >> Send Time:2019 Aug. 23 (Fri.) 15:20 > > >> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>> > > >> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> > > >> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication > Pattern > > >> > > >> Hi, > > >> > > >> Yun: > > >> > > >> Thanks for proposing the idea. I have checked the document and left > couple of questions there, but it might be better to answer them here. > > >> > > >> What is the exact motivation and what problems do you want to solve? > We have dropped multicast support from the network stack [1] for two > reasons: > > >> 1. Performance > > >> 2. Code simplicity > > >> > > >> The proposal to re introduce `int[] ChannelSelector#selectChannels()` > would revert those changes. At that time we were thinking about a way how > to keep the multicast support on the network level, while keeping the > performance and simplicity for non multicast cases and there are ways to > achieve that. However they would add extra complexity to Flink, which it > would be better to avoid. > > >> > > >> On the other hand, supporting dual pattern: standard partitioning or > broadcasting is easy to do, as LatencyMarkers are doing exactly that. It > would be just a matter of exposing this to the user in some way. So before > we go any further, can you describe your use cases/motivation? Isn’t mix of > standard partitioning and broadcasting enough? Do we need multicasting? > > >> > > >> Zhu: > > >> > > >> Could you rephrase your example? I didn’t quite understand it. > > >> > > >> Piotrek > > >> > > >> [1] https://issues.apache.org/jira/browse/FLINK-10662 < > https://issues.apache.org/jira/browse/FLINK-10662> < > https://issues.apache.org/jira/browse/FLINK-10662 < > https://issues.apache.org/jira/browse/FLINK-10662>> > > >> > > >> On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com <mailto: > reed...@gmail.com> <mailto:reed...@gmail.com <mailto:reed...@gmail.com>>> > wrote: > > >> > > >> Thanks Yun for starting this discussion. > > >> I think the multicasting can be very helpful in certain cases. > > >> > > >> I have received requirements from users that they want to do broadcast > > >> join, while the data set to broadcast is too large to fit in one task. > > >> Thus the requirement turned out to be to support cartesian product of > 2 > > >> data set(one of which can be infinite stream). > > >> For example, A(parallelism=2) broadcast join B(parallelism=2) in > JobVertex > > >> C. > > >> The idea to is have 4 C subtasks to deal with different combinations > of A/B > > >> partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2). > > >> This requires one record to be sent to multiple downstream subtasks, > but > > >> not to all subtasks. > > >> > > >> With current interface this is not supported, as one record can only > be > > >> sent to one subtask, or to all subtasks of a JobVertex. > > >> And the user had to split the broadcast data set manually to several > > >> different JobVertices, which is hard to maintain and extend. > > >> > > >> Thanks, > > >> Zhu Zhu > > >> > > >> Yun Gao <yungao...@aliyun.com.invalid <mailto: > yungao...@aliyun.com.invalid <mailto:yungao...@aliyun.com.invalid>>> > 于2019年8月22日周四 下午8:42写道: > > >> > > >> Hi everyone, > > >> In some scenarios we met a requirement that some operators want to > > >> send records to theirs downstream operators with an multicast > communication > > >> pattern. In detail, for some records, the operators want to send them > > >> according to the partitioner (for example, Rebalance), and for some > other > > >> records, the operators want to send them to all the connected > operators and > > >> tasks. Such a communication pattern could be viewed as a kind of > multicast: > > >> it does not broadcast every record, but some record will indeed be > sent to > > >> multiple downstream operators. > > >> > > >> However, we found that this kind of communication pattern seems could > not > > >> be implemented rightly if the operators have multiple consumers with > > >> different parallelism, using the customized partitioner. To solve the > above > > >> problem, we propose to enhance the support for such kind of irregular > > >> communication pattern. We think there may be two options: > > >> > > >> 1. Support a kind of customized operator events, which share much > > >> similarity with Watermark, and these events can be broadcasted to the > > >> downstream operators separately. > > >> 2. Let the channel selector supports multicast, and also add the > > >> separate RecordWriter implementation to avoid impacting the > performance of > > >> the channel selector that does not need multicast. > > >> > > >> The problem and options are detailed in > > >> > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing > < > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing> > < > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing > < > https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing > >> > > >> > > >> We are also wondering if there are other methods to implement this > > >> requirement with or without changing Runtime. Very thanks for any > feedbacks > > >> ! > > >> > > >> > > >> Best, > > >> Yun > > >> > > >> > > >> > > >> > > > > > > > > > > >