Hi Piotr,

Yes you are right it's a distributed cross join requirement.
Broadcast join can help with cross join cases. But users cannot use it if
the data set to join is too large to fit into one subtask.

Sorry for left some details behind.

Thanks,
Zhu Zhu

Piotr Nowojski <pi...@ververica.com> 于2019年8月23日周五 下午4:57写道:

> Hi Yun and Zhu Zhu,
>
> Thanks for the more detailed example Zhu Zhu.
>
> As far as I understand for the iterations example we do not need
> multicasting. Regarding the Join example, I don’t fully understand it. The
> example that Zhu Zhu presented has a drawback of sending both tables to
> multiple nodes. What’s the benefit of using broadcast join over a hash join
> in such case? As far as I know, the biggest benefit of using broadcast join
> instead of hash join is that we can avoid sending the larger table over the
> network, because we can perform the join locally. In this example we are
> sending both of the tables to multiple nodes, which should defeat the
> purpose.
>
> Is it about implementing cross join or near cross joins in a distributed
> fashion?
>
> > if we introduce a new MulticastRecordWriter
>
> That’s one of the solutions. It might have a drawback of 3 class
> virtualisation problem (We have RecordWriter and BroadcastRecordWriter
> already). With up to two implementations, JVM is able to devirtualise the
> calls.
>
> Previously I was also thinking about just providing two different
> ChannelSelector interfaces. One with `int[]` and `SingleChannelSelector`
> with plain `int` and based on that, RecordWriter could perform some magic
> (worst case scenario `instaceof` checks).
>
> Another solution might be to change `ChannelSelector` interface into an
> iterator.
>
> But let's discuss the details after we agree on implementing this.
>
> Piotrek
>
> > On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com> wrote:
> >
> >    Hi Piotr,
> >
> >         Thanks a lot for the suggestions!
> >
> >         The core motivation of this discussion is to implement a new
> iteration library on the DataStream, and it requires to insert special
> records in the stream to notify the progress of the iteration. The
> mechanism of such records is very similar to the current Watermark, and we
> meet the problem of sending normal records according to the partition
> (Rebalance, etc..) and also be able to broadcast the inserted progress
> records to all the connected records. I have read the notes in the google
> doc and I totally agree with that exploring the broadcast interface in
> RecordWriter in some way is able to solve this issue.
> >
> >        Regarding to `int[] ChannelSelector#selectChannels()`, I'm
> wondering if we introduce a new MulticastRecordWriter and left the current
> RecordWriter untouched, could we avoid the performance degradation ? Since
> with such a modification the normal RecordWriter does not need to iterate
> the return array by ChannelSelector, and the only difference will be
> returning an array instead of an integer, and accessing the first element
> of the returned array instead of reading the integer directly.
> >
> > Best,
> > Yun
> >
> > ------------------------------------------------------------------
> > From:Piotr Nowojski <pi...@ververica.com>
> > Send Time:2019 Aug. 23 (Fri.) 15:20
> > To:dev <dev@flink.apache.org>
> > Cc:Yun Gao <yungao...@aliyun.com>
> > Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern
> >
> > Hi,
> >
> > Yun:
> >
> > Thanks for proposing the idea. I have checked the document and left
> couple of questions there, but it might be better to answer them here.
> >
> > What is the exact motivation and what problems do you want to solve? We
> have dropped multicast support from the network stack [1] for two reasons:
> > 1. Performance
> > 2. Code simplicity
> >
> > The proposal to re introduce `int[] ChannelSelector#selectChannels()`
> would revert those changes. At that time we were thinking about a way how
> to keep the multicast support on the network level, while keeping the
> performance and simplicity for non multicast cases and there are ways to
> achieve that. However they would add extra complexity to Flink, which it
> would be better to avoid.
> >
> > On the other hand, supporting dual pattern: standard partitioning or
> broadcasting is easy to do, as LatencyMarkers are doing exactly that. It
> would be just a matter of exposing this to the user in some way. So before
> we go any further, can you describe your use cases/motivation? Isn’t mix of
> standard partitioning and broadcasting enough? Do we need multicasting?
> >
> > Zhu:
> >
> > Could you rephrase your example? I didn’t quite understand it.
> >
> > Piotrek
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-10662 <
> https://issues.apache.org/jira/browse/FLINK-10662>
> >
> > On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com <mailto:
> reed...@gmail.com>> wrote:
> >
> > Thanks Yun for starting this discussion.
> > I think the multicasting can be very helpful in certain cases.
> >
> > I have received requirements from users that they want to do broadcast
> > join, while the data set to broadcast is too large to fit in one task.
> > Thus the requirement turned out to be to support cartesian product of 2
> > data set(one of which can be infinite stream).
> > For example, A(parallelism=2) broadcast join B(parallelism=2) in
> JobVertex
> > C.
> > The idea to is have 4 C subtasks to deal with different combinations of
> A/B
> > partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
> > This requires one record to be sent to multiple downstream subtasks, but
> > not to all subtasks.
> >
> > With current interface this is not supported, as one record can only be
> > sent to one subtask, or to all subtasks of a JobVertex.
> > And the user had to split the broadcast data set manually to several
> > different JobVertices, which is hard to maintain and extend.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Yun Gao <yungao...@aliyun.com.invalid <mailto:
> yungao...@aliyun.com.invalid>> 于2019年8月22日周四 下午8:42写道:
> >
> > Hi everyone,
> >      In some scenarios we met a requirement that some operators want to
> > send records to theirs downstream operators with an multicast
> communication
> > pattern. In detail, for some records, the operators want to send them
> > according to the partitioner (for example, Rebalance), and for some other
> > records, the operators want to send them to all the connected operators
> and
> > tasks. Such a communication pattern could be viewed as a kind of
> multicast:
> > it does not broadcast every record, but some record will indeed be sent
> to
> > multiple downstream operators.
> >
> > However, we found that this kind of communication pattern seems could not
> > be implemented rightly if the operators have multiple consumers with
> > different parallelism, using the customized partitioner. To solve the
> above
> > problem, we propose to enhance the support for such kind of irregular
> > communication pattern. We think there may be two options:
> >
> >     1. Support a kind of customized operator events, which share much
> > similarity with Watermark, and these events can be broadcasted to the
> > downstream operators separately.
> >     2. Let the channel selector supports multicast, and also add the
> > separate RecordWriter implementation to avoid impacting the performance
> of
> > the channel selector that does not need multicast.
> >
> > The problem and options are detailed in
> >
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> <
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> >
> >
> > We are also wondering if there are other methods to implement this
> > requirement with or without changing Runtime. Very thanks for any
> feedbacks
> > !
> >
> >
> > Best,
> > Yun
> >
> >
> >
> >
>
>

Reply via email to