Hi Piotr,

Thanks for the explanation.
Agreed that the broadcastEmit(record) is a better choice for broadcasting
for the iterations.
As broadcasting for the iterations is the first motivation, let's support
it first.

Thanks,
Zhu Zhu

Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月23日周五 下午11:56写道:

>      Hi Piotr,
>
>       Very thanks for the suggestions!
>
>      Totally agree with that we could first focus on the broadcast
> scenarios and exposing the broadcastEmit method first considering the
> semantics and performance.
>
>      For the keyed stream, I also agree with that broadcasting keyed
> records to all the tasks may be confused considering the semantics of keyed
> partitioner. However, in the iteration case supporting broadcast over keyed
> partitioner should be required since users may create any subgraph for the
> iteration body, including the operators with key. I think a possible
> solution to this issue is to introduce another data type for
> 'broadcastEmit'. For example, for an operator Operator<T>, it may broadcast
> emit another type E instead of T, and the transmitting E will bypass the
> partitioner and setting keyed context. This should result in the design to
> introduce customized operator event (option 1 in the document). The cost of
> this method is that we need to introduce a new type of StreamElement and
> new interface for this type, but it should be suitable for both keyed or
> non-keyed partitioner.
>
> Best,
> Yun
>
>
>
> ------------------------------------------------------------------
> From:Piotr Nowojski <pi...@ververica.com>
> Send Time:2019 Aug. 23 (Fri.) 22:29
> To:Zhu Zhu <reed...@gmail.com>
> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>
> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern
>
> Hi,
>
> If the primary motivation is broadcasting (for the iterations) and we have
> no immediate need for multicast (cross join), I would prefer to first
> expose broadcast via the DataStream API and only later, once we finally
> need it, support multicast. As I wrote, multicast would be more challenging
> to implement, with more complicated runtime and API. And re-using multicast
> just to support broadcast doesn’t have much sense:
>
> 1. It’s a bit obfuscated. It’s easier to understand
> collectBroadcast(record) or broadcastEmit(record) compared to some
> multicast channel selector that just happens to return all of the channels.
> 2. There are performance benefits of explicitly calling
> `RecordWriter#broadcastEmit`.
>
>
> On a different note, what would be the semantic of such broadcast emit on
> KeyedStream? Would it be supported? Or would we limit support only to the
> non-keyed streams?
>
> Piotrek
>
> > On 23 Aug 2019, at 12:48, Zhu Zhu <reed...@gmail.com> wrote:
> >
> > Thanks Piotr,
> >
> > Users asked for this feature sometimes ago when they migrating batch
> jobs to Flink(Blink).
> > It's not very urgent as they have taken some workarounds to solve
> it.(like partitioning data set to different job vertices)
> > So it's fine to not make it top priority.
> >
> > Anyway, as a commonly known scenario, I think users can benefit from
> cross join sooner or later.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>>
> 于2019年8月23日周五 下午6:19写道:
> > Hi,
> >
> > Thanks for the answers :) Ok I understand the full picture now. +1 from
> my side on solving this issue somehow. But before we start discussing how
> to solve it one last control question:
> >
> > I guess this multicast is intended to be used in blink planner, right?
> Assuming that we implement the multicast support now, when would it be used
> by the blink? I would like to avoid a scenario, where we implement an
> unused feature and we keep maintaining it for a long period of time.
> >
> > Piotrek
> >
> > PS, try to include motivating examples, including concrete ones in the
> proposals/design docs, for example in the very first paragraph. Especially
> if it’s a commonly known feature like cross join :)
> >
> > > On 23 Aug 2019, at 11:38, Yun Gao <yungao...@aliyun.com.INVALID>
> wrote:
> > >
> > >     Hi Piotr,
> > >
> > >        Thanks a lot for sharing the thoughts!
> > >
> > >        For the iteration, agree with that multicasting is not
> necessary. Exploring the broadcast interface to Output of the operators in
> some way should also solve this issue, and I think it should be even more
> convenient to have the broadcast method for the iteration.
> > >
> > >        Also thanks Zhu Zhu for the cross join case!
> > >  Best,
> > >   Yun
> > >
> > >
> > >
> > > ------------------------------------------------------------------
> > > From:Zhu Zhu <reed...@gmail.com <mailto:reed...@gmail.com>>
> > > Send Time:2019 Aug. 23 (Fri.) 17:25
> > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>>
> > > Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>>
> > > Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
> Pattern
> > >
> > > Hi Piotr,
> > >
> > > Yes you are right it's a distributed cross join requirement.
> > > Broadcast join can help with cross join cases. But users cannot use it
> if the data set to join is too large to fit into one subtask.
> > >
> > > Sorry for left some details behind.
> > >
> > > Thanks,
> > > Zhu Zhu
> > > Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>>
> 于2019年8月23日周五 下午4:57写道:
> > > Hi Yun and Zhu Zhu,
> > >
> > > Thanks for the more detailed example Zhu Zhu.
> > >
> > > As far as I understand for the iterations example we do not need
> multicasting. Regarding the Join example, I don’t fully understand it. The
> example that Zhu Zhu presented has a drawback of sending both tables to
> multiple nodes. What’s the benefit of using broadcast join over a hash join
> in such case? As far as I know, the biggest benefit of using broadcast join
> instead of hash join is that we can avoid sending the larger table over the
> network, because we can perform the join locally. In this example we are
> sending both of the tables to multiple nodes, which should defeat the
> purpose.
> > >
> > > Is it about implementing cross join or near cross joins in a
> distributed fashion?
> > >
> > >> if we introduce a new MulticastRecordWriter
> > >
> > > That’s one of the solutions. It might have a drawback of 3 class
> virtualisation problem (We have RecordWriter and BroadcastRecordWriter
> already). With up to two implementations, JVM is able to devirtualise the
> calls.
> > >
> > > Previously I was also thinking about just providing two different
> ChannelSelector interfaces. One with `int[]` and `SingleChannelSelector`
> with plain `int` and based on that, RecordWriter could perform some magic
> (worst case scenario `instaceof` checks).
> > >
> > > Another solution might be to change `ChannelSelector` interface into
> an iterator.
> > >
> > > But let's discuss the details after we agree on implementing this.
> > >
> > > Piotrek
> > >
> > >> On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com <mailto:
> yungao...@aliyun.com>> wrote:
> > >>
> > >>   Hi Piotr,
> > >>
> > >>        Thanks a lot for the suggestions!
> > >>
> > >>        The core motivation of this discussion is to implement a new
> iteration library on the DataStream, and it requires to insert special
> records in the stream to notify the progress of the iteration. The
> mechanism of such records is very similar to the current Watermark, and we
> meet the problem of sending normal records according to the partition
> (Rebalance, etc..) and also be able to broadcast the inserted progress
> records to all the connected records. I have read the notes in the google
> doc and I totally agree with that exploring the broadcast interface in
> RecordWriter in some way is able to solve this issue.
> > >>
> > >>       Regarding to `int[] ChannelSelector#selectChannels()`, I'm
> wondering if we introduce a new MulticastRecordWriter and left the current
> RecordWriter untouched, could we avoid the performance degradation ? Since
> with such a modification the normal RecordWriter does not need to iterate
> the return array by ChannelSelector, and the only difference will be
> returning an array instead of an integer, and accessing the first element
> of the returned array instead of reading the integer directly.
> > >>
> > >> Best,
> > >> Yun
> > >>
> > >> ------------------------------------------------------------------
> > >> From:Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com
> >>
> > >> Send Time:2019 Aug. 23 (Fri.) 15:20
> > >> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>>
> > >> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>>
> > >> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
> Pattern
> > >>
> > >> Hi,
> > >>
> > >> Yun:
> > >>
> > >> Thanks for proposing the idea. I have checked the document and left
> couple of questions there, but it might be better to answer them here.
> > >>
> > >> What is the exact motivation and what problems do you want to solve?
> We have dropped multicast support from the network stack [1] for two
> reasons:
> > >> 1. Performance
> > >> 2. Code simplicity
> > >>
> > >> The proposal to re introduce `int[] ChannelSelector#selectChannels()`
> would revert those changes. At that time we were thinking about a way how
> to keep the multicast support on the network level, while keeping the
> performance and simplicity for non multicast cases and there are ways to
> achieve that. However they would add extra complexity to Flink, which it
> would be better to avoid.
> > >>
> > >> On the other hand, supporting dual pattern: standard partitioning or
> broadcasting is easy to do, as LatencyMarkers are doing exactly that. It
> would be just a matter of exposing this to the user in some way. So before
> we go any further, can you describe your use cases/motivation? Isn’t mix of
> standard partitioning and broadcasting enough? Do we need multicasting?
> > >>
> > >> Zhu:
> > >>
> > >> Could you rephrase your example? I didn’t quite understand it.
> > >>
> > >> Piotrek
> > >>
> > >> [1] https://issues.apache.org/jira/browse/FLINK-10662 <
> https://issues.apache.org/jira/browse/FLINK-10662> <
> https://issues.apache.org/jira/browse/FLINK-10662 <
> https://issues.apache.org/jira/browse/FLINK-10662>>
> > >>
> > >> On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com <mailto:
> reed...@gmail.com> <mailto:reed...@gmail.com <mailto:reed...@gmail.com>>>
> wrote:
> > >>
> > >> Thanks Yun for starting this discussion.
> > >> I think the multicasting can be very helpful in certain cases.
> > >>
> > >> I have received requirements from users that they want to do broadcast
> > >> join, while the data set to broadcast is too large to fit in one task.
> > >> Thus the requirement turned out to be to support cartesian product of
> 2
> > >> data set(one of which can be infinite stream).
> > >> For example, A(parallelism=2) broadcast join B(parallelism=2) in
> JobVertex
> > >> C.
> > >> The idea to is have 4 C subtasks to deal with different combinations
> of A/B
> > >> partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
> > >> This requires one record to be sent to multiple downstream subtasks,
> but
> > >> not to all subtasks.
> > >>
> > >> With current interface this is not supported, as one record can only
> be
> > >> sent to one subtask, or to all subtasks of a JobVertex.
> > >> And the user had to split the broadcast data set manually to several
> > >> different JobVertices, which is hard to maintain and extend.
> > >>
> > >> Thanks,
> > >> Zhu Zhu
> > >>
> > >> Yun Gao <yungao...@aliyun.com.invalid <mailto:
> yungao...@aliyun.com.invalid <mailto:yungao...@aliyun.com.invalid>>>
> 于2019年8月22日周四 下午8:42写道:
> > >>
> > >> Hi everyone,
> > >>     In some scenarios we met a requirement that some operators want to
> > >> send records to theirs downstream operators with an multicast
> communication
> > >> pattern. In detail, for some records, the operators want to send them
> > >> according to the partitioner (for example, Rebalance), and for some
> other
> > >> records, the operators want to send them to all the connected
> operators and
> > >> tasks. Such a communication pattern could be viewed as a kind of
> multicast:
> > >> it does not broadcast every record, but some record will indeed be
> sent to
> > >> multiple downstream operators.
> > >>
> > >> However, we found that this kind of communication pattern seems could
> not
> > >> be implemented rightly if the operators have multiple consumers with
> > >> different parallelism, using the customized partitioner. To solve the
> above
> > >> problem, we propose to enhance the support for such kind of irregular
> > >> communication pattern. We think there may be two options:
> > >>
> > >>    1. Support a kind of customized operator events, which share much
> > >> similarity with Watermark, and these events can be broadcasted to the
> > >> downstream operators separately.
> > >>    2. Let the channel selector supports multicast, and also add the
> > >> separate RecordWriter implementation to avoid impacting the
> performance of
> > >> the channel selector that does not need multicast.
> > >>
> > >> The problem and options are detailed in
> > >>
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> <
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing>
> <
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> <
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> >>
> > >>
> > >> We are also wondering if there are other methods to implement this
> > >> requirement with or without changing Runtime. Very thanks for any
> feedbacks
> > >> !
> > >>
> > >>
> > >> Best,
> > >> Yun
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> >
>
>
>

Reply via email to