Hi Fabian, It is very helpful of your response! But in order to make sure I understand correctly, I put my pseudo-code here first:
class OuterJoinCoGroupFunction implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double>{ @Override public void coGroup(Iterable<Tuple2<String, Integer> > iVals, Iterable<Tuple2<String, Double> > dVals, Collector<Tuple2<Integer, Double> out){ Set<Integer> ints = new HashSet<Integer>(); for (Tuple2<String, Integer > > val : iVals){ ints.add(val.f1); } if(ints.isEmpty()){ ints.add(NULL); } for (Tuple2<String, Double> val : dVals){ for (Integer i : ints){ out.collect(new Tuple2(i, val.f1)); } } } } The code above try to builds the matching pairs and if one of the group is empty, I append the NULL value to it. However, I don’t really understand how to implement the OuterJoinMapFunction. I am also puzzled about how the Reduce/GroupReduce translated into Map -> Reduce. Where can I found the materials or the source code about this? Best, Wilson. > 在 2014年12月15日,下午5:17,Fabian Hueske <fhue...@apache.org> 写道: > > That's a good point. > > You can implement an outer join using the available runtime. This way you > do not need to touch the optimizer and runtime but only the API layer. > This basically means to add syntactic sugar to the available API. The API > will translate the outer join into a CoGroup which builds all pairs of > joining elements and a Map which applies the join function to each joined > pair. > > It could look like this: > > DataSet<TypeX> in1; > DataSet<TypeY> in2; > in1.outerJoin(in2).where(...).equalTo(...).with(new MyJoinFunction) > > which would be translated into > > in1.coGroup(in2).where(...).equalTo(...).with(new > OuterJoinCoGroupFunction).map(new OuterJoinMapFunction(MyJoinFunction)); > > OJCoGroupFunction and OJMapFunction are functions that you need to > implement. > OJCoGroupFunction does what Stephan said (it builds pairs of matching > elements) and returns a Tuple2<TypeX, TypeY>. > OJMapFunction unpacks the Tuple2<TypeX, TypeY> and calls the user's Join > function (MyJoinFunction). > > There are a few operators implemented this way. For example have a look at > the Reduce/GroupReduce with KeySelectors which are translated into Map -> > Reduce (or Map -> GroupReduce). > > Let us know, if you have any questions! > > Cheers, Fabian > > > 2014-12-13 16:25 GMT+01:00 Stephan Ewen <se...@apache.org>: >> >> Hi Wilson! >> >> You can start by mocking an outer join operator using a special CoGroup >> function. If one of the two sides for a group is empty, you have the case >> where you need to append null values. Otherwise, you build the Cartesian >> produce within the group. >> >> For a proper through-the-stack implementation (not sure if that is needed, >> but may be nice to have), have a look here: >> >> >> http://flink.incubator.apache.org/docs/0.7-incubating/internal_add_operator.html >> >> Greetings, >> Stephan >> >> >> On Sat, Dec 13, 2014 at 3:19 AM, Zihong Cao <wilsonca...@gmail.com> wrote: >>> >>> Hi, >>> >>> I am trying to pick up the outer join operator. However, as Fabian >>> mentioned to me, that this task would require to touch many different >>> components of the system, it would be a challenge job for me. Therefore I >>> would need some help:-) >>> >>> I might need to walk through some features like Compiler/Optimizer and >>> Runtime(as Fabian mentioned to me), so where should I start to get >> familiar? >>> >>> One more thing, is the outer join operator implementation similar to the >>> pure join operator? >>> >>> Best, >>> Wilson Cao >>