[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-10-08 Thread r-pogalz
Github user r-pogalz commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r41505034
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
 ---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.commons.collections.ResettableIterator;
+import org.apache.commons.collections.iterators.ListIteratorWrapper;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OuterJoinOperatorBase> extends AbstractJoinOperatorBase {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private OuterJoinType outerJoinType;
+
+   public OuterJoinOperatorBase(UserCodeWrapper udf, 
BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(new UserCodeObjectWrapper(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinOperatorBase(Class udf, 
BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(new UserCodeClassWrapper(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public void setOuterJoinType(OuterJoinType outerJoinType) {
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinType getOuterJoinType() {
+   return outerJoinType;
+   }
+
+   @Override
+   protected List executeOnCollections(List leftInput, List 
rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) 
throws Exception {
+   TypeInformation leftInformation = 
getOperatorInfo().getFirstInputType();
+   TypeInformation rightInformation = 
getOperatorInfo().getSecondInputType();
+   TypeInformation outInformation = 
getOperatorInfo().getOutputType();
+
+   TypeComparator leftComparator = buildComparatorFor(0, 
executionConfig, leftInformation);
+   TypeComparator rightComparator = buildComparatorFor(1, 
executionConfig, rightInformation);
+
+   TypeSerializer leftSerializer = 
lef

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-10-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-146790205
  
Thanks for the update @jkovacs and @r-pogalz. Very good work!
I will go ahead, try this PR, and merge it :-) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-10-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-146835992
  
Looks good. I added one commit to restore binary compatibility. The code is 
not super nice, but it allows to runt programs which have been previously 
compiled without the need to recompile. We can still clean up the code later if 
we decide to do so. Final tests are running, will merge after they passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-10-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1138


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-10-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-146886307
  
Thank you very much @jkovacs and @r-pogalz for adding outer joins to Flink!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-10-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-146885685
  
Oh, just realized we did not update the documentation.
I will open a JIRA for that and add it later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-17 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-14654
  
@jkovacs and @r-pogalz, thank you very much for this PR and the detailed 
description!
It's quite a bit of code so it will take some time to be reviewed. I hope 
to give feedback soon.

Nonetheless, we can start a discussion about the handling of projection for 
outer joins. By changing the type information to `GenericTypeInfo` to 
support tuples with null values, a `DataSet` cannot be used (in a join, 
groupBy, reduce, ...) as before because the runtime will use completely 
different serializers and comparators. Therefore, I am more in favor of not 
supporting projection for outer joins.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-18 Thread jkovacs
Github user jkovacs commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-141497981
  
Thanks @fhueske, that's a good point I haven't considered. 

Another idea that occurred to me was to convert the result tuple types to 
`GenericTypeInfo` (instead of `GenericTypeInfo`), where `T` is the 
original type of the tuple field (e.g. `String` or `Integer`). This would be 
null safe _and_ would allow the user to group by those fields, assuming of 
course they are sure that the fields are non-null (e.g. on a left or right 
outer join).
Although I'm not sure of all the consequences of using, say, 
`GenericTypeInfo` instead of `BasicTypeInfo` for serialization 
and comparison.

I pushed this change as 
https://github.com/jkovacs/flink/commit/f682baa50137e0a54bae091ba60ba85fdb8f4c1b
 to a different branch to test it 

Also rebased branch onto current master and resolved conflicts (Failing 
test is some YARN integration test).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-18 Thread jkovacs
Github user jkovacs commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-141569673
  
To partly answer my own question: One big drawback of downgrading the tuple 
field types to `GenericTypeInfo` is that for (de)serialization and comparison 
the generic Kryo serializers will be used, which are significantly slower than 
the native flink serializers and comparators for basic types, such as Integer 
(according to [this blog 
post](http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html)).

One obvious way to work around this is to only downgrade the fields that 
are actually nullable, and keep the original types of the definitely non-null 
fields (i.e. the types from the outer side of a left or right outer join). This 
way the user can still group/join/sort efficiently on the non-null fields, 
while preserving null safety for the other fields.

I pushed another commit for this to my temporary branch for review, if this 
makes sense: 
https://github.com/jkovacs/flink/compare/feature/FLINK-2576...jkovacs:feature/FLINK-2576-projection-types

As you can see I was really hoping to make the projection joins work 
properly :-) but if you feel that the effort isn't worth it or I'm missing 
something else entirely, we can for sure simply scrap that and throw an 
`InvalidProgramException` when the user tries to do a project outer join 
instead of defining his own join udf. Opinions on that are welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-23 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-142723198
  
Hi @jkovacs, thanks for all your efforts to make the projection work. Going 
for a `GenericeTypeInfo` would work in many cases but unfortunately not in all. 
For example `union` operates in Flink on serialization level and requires that 
all data sets which are unioned use the same serializer. By transparently using 
a `GenericTypeInfo` users might be surprised why 
`DataSet.union(DataSet)` does not work. 
If we only support OuterJoins with an explicit JoinFunction, the user has full 
control how to deal with null values and can even use a custom Tuple type or 
Tuple serializer (via `Operator.returns()`) that supports null values. In my 
opinion, the best approach is to only support OuterJoins with JoinFunctions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-23 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-142754122
  
Agreed with Fabian. For now, let's require join functions.

Future work would be to use Tuples with Options in Scala. In Java, we 
should probably add an option type as well (and teach the TypeExtractor to use 
them). Java core adds an Option type only in Java 8, unfortunately.

We could add one for Java 7 and deprecate it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40431134
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
 ---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.commons.collections.ResettableIterator;
+import org.apache.commons.collections.iterators.ListIteratorWrapper;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OuterJoinOperatorBase> extends AbstractJoinOperatorBase {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private OuterJoinType outerJoinType;
+
+   public OuterJoinOperatorBase(UserCodeWrapper udf, 
BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(new UserCodeObjectWrapper(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinOperatorBase(Class udf, 
BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(new UserCodeClassWrapper(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public void setOuterJoinType(OuterJoinType outerJoinType) {
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinType getOuterJoinType() {
+   return outerJoinType;
+   }
+
+   @Override
+   protected List executeOnCollections(List leftInput, List 
rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) 
throws Exception {
+   TypeInformation leftInformation = 
getOperatorInfo().getFirstInputType();
+   TypeInformation rightInformation = 
getOperatorInfo().getSecondInputType();
+   TypeInformation outInformation = 
getOperatorInfo().getOutputType();
+
+   TypeComparator leftComparator = buildComparatorFor(0, 
executionConfig, leftInformation);
+   TypeComparator rightComparator = buildComparatorFor(1, 
executionConfig, rightInformation);
+
+   TypeSerializer leftSerializer = 
left

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40431242
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
 ---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.commons.collections.ResettableIterator;
+import org.apache.commons.collections.iterators.ListIteratorWrapper;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OuterJoinOperatorBase> extends AbstractJoinOperatorBase {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private OuterJoinType outerJoinType;
+
+   public OuterJoinOperatorBase(UserCodeWrapper udf, 
BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(new UserCodeObjectWrapper(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinOperatorBase(Class udf, 
BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(new UserCodeClassWrapper(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public void setOuterJoinType(OuterJoinType outerJoinType) {
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinType getOuterJoinType() {
+   return outerJoinType;
+   }
+
+   @Override
+   protected List executeOnCollections(List leftInput, List 
rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) 
throws Exception {
+   TypeInformation leftInformation = 
getOperatorInfo().getFirstInputType();
+   TypeInformation rightInformation = 
getOperatorInfo().getSecondInputType();
+   TypeInformation outInformation = 
getOperatorInfo().getOutputType();
+
+   TypeComparator leftComparator = buildComparatorFor(0, 
executionConfig, leftInformation);
+   TypeComparator rightComparator = buildComparatorFor(1, 
executionConfig, rightInformation);
+
+   TypeSerializer leftSerializer = 
left

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40434251
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java 
---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import 
org.apache.flink.api.common.operators.base.AbstractJoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import 
org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
+import 
org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
+import 
org.apache.flink.optimizer.operators.SortMergeRightOuterJoinDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OuterJoinNode extends TwoInputNode {
+
+   private List dataProperties;
+
+   /**
+* Creates a new two input node for the optimizer plan, representing 
the given operator.
+*
+* @param operator The operator that the optimizer DAG node should 
represent.
+*/
+   public OuterJoinNode(OuterJoinOperatorBase operator) {
+   super(operator);
+
+   this.dataProperties = getDataProperties();
+   }
+
+   private List getDataProperties() {
+   OuterJoinOperatorBase operator = getOperator();
+
+   OuterJoinType type = operator.getOuterJoinType();
+
+   JoinHint joinHint = operator.getJoinHint();
+   joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : 
joinHint;
+
+   List list = new ArrayList<>();
+   switch (joinHint) {
+   case OPTIMIZER_CHOOSES:
+   list.add(getSortMergeDescriptor(type, true));
+   list.add(getSortMergeDescriptor(type, false));
+   break;
+   case REPARTITION_SORT_MERGE:
+   list.add(getSortMergeDescriptor(type, false));
+   break;
+   case REPARTITION_HASH_FIRST:
+   case REPARTITION_HASH_SECOND:
+   case BROADCAST_HASH_FIRST:
+   case BROADCAST_HASH_SECOND:
+   default:
+   throw new CompilerException("Invalid join hint: 
" + joinHint + " for outer join type: " + type);
+   }
+
+   Partitioner customPartitioner = 
operator.getCustomPartitioner();
+   if (customPartitioner != null) {
+   for (OperatorDescriptorDual desc : list) {
+   ((AbstractJoinDescriptor) 
desc).setCustomPartitioner(customPartitioner);
+   }
+   }
+   return list;
+   }
+
+   private OperatorDescriptorDual getSortMergeDescriptor(OuterJoinType 
type, boolean broadcastAllowed) {
+   if (type == OuterJoinType.FULL) {
+   return new SortMergeFullOuterJoinDescriptor(this.keys1, 
this.keys2);
+   } else if (type == OuterJoinType.LEFT) {
+   return new SortMergeLeftOuterJoinDescriptor(this.keys1, 
this.keys2, broadcastAllowed);
+   } else {
+   return new 
SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
+   }
+   }
+
+   @Override
+   public OuterJoinOperatorBase getOperator() {
+   return (OuterJoinOperatorBase) super.getOperator();
+   }
+
+   @O

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40434718
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java 
---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import 
org.apache.flink.api.common.operators.base.AbstractJoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import 
org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
+import 
org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
+import 
org.apache.flink.optimizer.operators.SortMergeRightOuterJoinDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OuterJoinNode extends TwoInputNode {
+
+   private List dataProperties;
+
+   /**
+* Creates a new two input node for the optimizer plan, representing 
the given operator.
+*
+* @param operator The operator that the optimizer DAG node should 
represent.
+*/
+   public OuterJoinNode(OuterJoinOperatorBase operator) {
+   super(operator);
+
+   this.dataProperties = getDataProperties();
+   }
+
+   private List getDataProperties() {
+   OuterJoinOperatorBase operator = getOperator();
+
+   OuterJoinType type = operator.getOuterJoinType();
+
+   JoinHint joinHint = operator.getJoinHint();
+   joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : 
joinHint;
+
+   List list = new ArrayList<>();
+   switch (joinHint) {
+   case OPTIMIZER_CHOOSES:
+   list.add(getSortMergeDescriptor(type, true));
+   list.add(getSortMergeDescriptor(type, false));
--- End diff --

For `OuterJoinType.FULL`, this will add the same descriptor a second time. 
This increases the number of enumerated plans and should be avoided.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40434856
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java ---
@@ -22,15 +22,16 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.operators.base.AbstractJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import 
org.apache.flink.api.common.operators.base.AbstractJoinOperatorBase.JoinHint;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
 import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
 import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
--- End diff --

You completely remove the `MatchNode`. It is not referenced in the source 
code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40439495
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java 
---
@@ -281,233 +315,225 @@ protected boolean 
udfWithForwardedFieldsSecondAnnotation(Class udfClass) {
}
 
@Override
-   protected JoinOperatorBase 
translateToDataFlow(Operator input1, Operator input2) {
+   protected AbstractJoinOperatorBase 
translateToDataFlow(Operator input1, Operator input2) {
+   String name = getName() != null ? getName() : "Join at 
" + joinLocationName;
+
+   JoinOperatorBaseBuilder builder = new 
JoinOperatorBaseBuilder(name, joinType)
+   .withParallelism(getParallelism())
+   .withPartitioner(getPartitioner())
+   .withJoinHint(getJoinHint())
+   .withResultType(getResultType());
+
+   final boolean requiresTupleUnwrapping = keys1 
instanceof Keys.SelectorFunctionKeys || keys2 instanceof 
Keys.SelectorFunctionKeys;
+   if (requiresTupleUnwrapping) {
+   if (keys1 instanceof Keys.SelectorFunctionKeys 
&& keys2 instanceof Keys.SelectorFunctionKeys) {
+   // Both join sides have a key selector 
function, so we need to do the
+   // tuple wrapping/unwrapping on both 
sides.
+
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys1 = (Keys.SelectorFunctionKeys) keys1;
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys2 = (Keys.SelectorFunctionKeys) keys2;
+
+   builder = builder
+   .withUdf(new 
TupleUnwrappingJoiner<>(function))
+   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+   } else if (keys2 instanceof 
Keys.SelectorFunctionKeys) {
+   // The right side of the join needs the 
tuple wrapping/unwrapping
+
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys2 = (Keys.SelectorFunctionKeys) keys2;
+
+   builder = builder
+   .withUdf(new 
TupleRightUnwrappingJoiner<>(function))
+   .withInput1(input1, 
getInput1Type(), keys1)
+   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+   } else {
+   // The left side of the join needs the 
tuple wrapping/unwrapping
 
-   String name = getName() != null ? getName() : "Join at 
"+joinLocationName;
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys1 = (Keys.SelectorFunctionKeys) keys1;
 
-   final JoinOperatorBase translated;
-   
-   if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 
instanceof Keys.SelectorFunctionKeys) {
-   // Both join sides have a key selector 
function, so we need to do the
-   // tuple wrapping/unwrapping on both sides.
+   builder = builder
+   .withUdf(new 
TupleLeftUnwrappingJoiner<>(function))
+   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+   .withInput2(input2, 
getInput2Type(), keys2);
+   }
+   } else if (keys1 instanceof Keys.ExpressionKeys && 
keys2 instanceof Keys.ExpressionKeys) {
+   // Neither side needs the tuple 
wrapping/unwrapping
 
-   @SuppressWarnings("unchecked")
-   Keys.SelectorFunctionKeys selectorKeys1 
= (Keys.SelectorFunctionKeys) keys1;
-   @SuppressWarnings("unchecked")
-   Keys.SelectorFunctionKeys selectorKeys

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40439571
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java 
---
@@ -281,233 +315,225 @@ protected boolean 
udfWithForwardedFieldsSecondAnnotation(Class udfClass) {
}
 
@Override
-   protected JoinOperatorBase 
translateToDataFlow(Operator input1, Operator input2) {
+   protected AbstractJoinOperatorBase 
translateToDataFlow(Operator input1, Operator input2) {
+   String name = getName() != null ? getName() : "Join at 
" + joinLocationName;
+
+   JoinOperatorBaseBuilder builder = new 
JoinOperatorBaseBuilder(name, joinType)
+   .withParallelism(getParallelism())
+   .withPartitioner(getPartitioner())
+   .withJoinHint(getJoinHint())
+   .withResultType(getResultType());
+
+   final boolean requiresTupleUnwrapping = keys1 
instanceof Keys.SelectorFunctionKeys || keys2 instanceof 
Keys.SelectorFunctionKeys;
+   if (requiresTupleUnwrapping) {
+   if (keys1 instanceof Keys.SelectorFunctionKeys 
&& keys2 instanceof Keys.SelectorFunctionKeys) {
+   // Both join sides have a key selector 
function, so we need to do the
+   // tuple wrapping/unwrapping on both 
sides.
+
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys1 = (Keys.SelectorFunctionKeys) keys1;
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys2 = (Keys.SelectorFunctionKeys) keys2;
+
+   builder = builder
+   .withUdf(new 
TupleUnwrappingJoiner<>(function))
+   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+   } else if (keys2 instanceof 
Keys.SelectorFunctionKeys) {
+   // The right side of the join needs the 
tuple wrapping/unwrapping
+
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys2 = (Keys.SelectorFunctionKeys) keys2;
+
+   builder = builder
+   .withUdf(new 
TupleRightUnwrappingJoiner<>(function))
+   .withInput1(input1, 
getInput1Type(), keys1)
+   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+   } else {
+   // The left side of the join needs the 
tuple wrapping/unwrapping
 
-   String name = getName() != null ? getName() : "Join at 
"+joinLocationName;
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys1 = (Keys.SelectorFunctionKeys) keys1;
 
-   final JoinOperatorBase translated;
-   
-   if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 
instanceof Keys.SelectorFunctionKeys) {
-   // Both join sides have a key selector 
function, so we need to do the
-   // tuple wrapping/unwrapping on both sides.
+   builder = builder
+   .withUdf(new 
TupleLeftUnwrappingJoiner<>(function))
+   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+   .withInput2(input2, 
getInput2Type(), keys2);
+   }
+   } else if (keys1 instanceof Keys.ExpressionKeys && 
keys2 instanceof Keys.ExpressionKeys) {
+   // Neither side needs the tuple 
wrapping/unwrapping
 
-   @SuppressWarnings("unchecked")
-   Keys.SelectorFunctionKeys selectorKeys1 
= (Keys.SelectorFunctionKeys) keys1;
-   @SuppressWarnings("unchecked")
-   Keys.SelectorFunctionKeys selectorKeys

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40439511
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java 
---
@@ -281,233 +315,225 @@ protected boolean 
udfWithForwardedFieldsSecondAnnotation(Class udfClass) {
}
 
@Override
-   protected JoinOperatorBase 
translateToDataFlow(Operator input1, Operator input2) {
+   protected AbstractJoinOperatorBase 
translateToDataFlow(Operator input1, Operator input2) {
+   String name = getName() != null ? getName() : "Join at 
" + joinLocationName;
+
+   JoinOperatorBaseBuilder builder = new 
JoinOperatorBaseBuilder(name, joinType)
+   .withParallelism(getParallelism())
+   .withPartitioner(getPartitioner())
+   .withJoinHint(getJoinHint())
+   .withResultType(getResultType());
+
+   final boolean requiresTupleUnwrapping = keys1 
instanceof Keys.SelectorFunctionKeys || keys2 instanceof 
Keys.SelectorFunctionKeys;
+   if (requiresTupleUnwrapping) {
+   if (keys1 instanceof Keys.SelectorFunctionKeys 
&& keys2 instanceof Keys.SelectorFunctionKeys) {
+   // Both join sides have a key selector 
function, so we need to do the
+   // tuple wrapping/unwrapping on both 
sides.
+
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys1 = (Keys.SelectorFunctionKeys) keys1;
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys2 = (Keys.SelectorFunctionKeys) keys2;
+
+   builder = builder
+   .withUdf(new 
TupleUnwrappingJoiner<>(function))
+   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+   } else if (keys2 instanceof 
Keys.SelectorFunctionKeys) {
+   // The right side of the join needs the 
tuple wrapping/unwrapping
+
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys2 = (Keys.SelectorFunctionKeys) keys2;
+
+   builder = builder
+   .withUdf(new 
TupleRightUnwrappingJoiner<>(function))
+   .withInput1(input1, 
getInput1Type(), keys1)
+   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+   } else {
+   // The left side of the join needs the 
tuple wrapping/unwrapping
 
-   String name = getName() != null ? getName() : "Join at 
"+joinLocationName;
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys1 = (Keys.SelectorFunctionKeys) keys1;
 
-   final JoinOperatorBase translated;
-   
-   if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 
instanceof Keys.SelectorFunctionKeys) {
-   // Both join sides have a key selector 
function, so we need to do the
-   // tuple wrapping/unwrapping on both sides.
+   builder = builder
+   .withUdf(new 
TupleLeftUnwrappingJoiner<>(function))
+   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+   .withInput2(input2, 
getInput2Type(), keys2);
+   }
+   } else if (keys1 instanceof Keys.ExpressionKeys && 
keys2 instanceof Keys.ExpressionKeys) {
+   // Neither side needs the tuple 
wrapping/unwrapping
 
-   @SuppressWarnings("unchecked")
-   Keys.SelectorFunctionKeys selectorKeys1 
= (Keys.SelectorFunctionKeys) keys1;
-   @SuppressWarnings("unchecked")
-   Keys.SelectorFunctionKeys selectorKeys

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40439650
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java 
---
@@ -281,233 +315,225 @@ protected boolean 
udfWithForwardedFieldsSecondAnnotation(Class udfClass) {
}
 
@Override
-   protected JoinOperatorBase 
translateToDataFlow(Operator input1, Operator input2) {
+   protected AbstractJoinOperatorBase 
translateToDataFlow(Operator input1, Operator input2) {
+   String name = getName() != null ? getName() : "Join at 
" + joinLocationName;
+
+   JoinOperatorBaseBuilder builder = new 
JoinOperatorBaseBuilder(name, joinType)
+   .withParallelism(getParallelism())
+   .withPartitioner(getPartitioner())
+   .withJoinHint(getJoinHint())
+   .withResultType(getResultType());
+
+   final boolean requiresTupleUnwrapping = keys1 
instanceof Keys.SelectorFunctionKeys || keys2 instanceof 
Keys.SelectorFunctionKeys;
+   if (requiresTupleUnwrapping) {
+   if (keys1 instanceof Keys.SelectorFunctionKeys 
&& keys2 instanceof Keys.SelectorFunctionKeys) {
+   // Both join sides have a key selector 
function, so we need to do the
+   // tuple wrapping/unwrapping on both 
sides.
+
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys1 = (Keys.SelectorFunctionKeys) keys1;
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys2 = (Keys.SelectorFunctionKeys) keys2;
+
+   builder = builder
+   .withUdf(new 
TupleUnwrappingJoiner<>(function))
+   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+   } else if (keys2 instanceof 
Keys.SelectorFunctionKeys) {
+   // The right side of the join needs the 
tuple wrapping/unwrapping
+
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys2 = (Keys.SelectorFunctionKeys) keys2;
+
+   builder = builder
+   .withUdf(new 
TupleRightUnwrappingJoiner<>(function))
+   .withInput1(input1, 
getInput1Type(), keys1)
+   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+   } else {
+   // The left side of the join needs the 
tuple wrapping/unwrapping
 
-   String name = getName() != null ? getName() : "Join at 
"+joinLocationName;
+   @SuppressWarnings("unchecked")
+   Keys.SelectorFunctionKeys 
selectorKeys1 = (Keys.SelectorFunctionKeys) keys1;
 
-   final JoinOperatorBase translated;
-   
-   if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 
instanceof Keys.SelectorFunctionKeys) {
-   // Both join sides have a key selector 
function, so we need to do the
-   // tuple wrapping/unwrapping on both sides.
+   builder = builder
+   .withUdf(new 
TupleLeftUnwrappingJoiner<>(function))
+   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+   .withInput2(input2, 
getInput2Type(), keys2);
+   }
+   } else if (keys1 instanceof Keys.ExpressionKeys && 
keys2 instanceof Keys.ExpressionKeys) {
+   // Neither side needs the tuple 
wrapping/unwrapping
 
-   @SuppressWarnings("unchecked")
-   Keys.SelectorFunctionKeys selectorKeys1 
= (Keys.SelectorFunctionKeys) keys1;
-   @SuppressWarnings("unchecked")
-   Keys.SelectorFunctionKeys selectorKeys

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40440502
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
 ---
@@ -47,99 +45,29 @@
 /**
  * @see org.apache.flink.api.common.functions.FlatJoinFunction
  */
-public class JoinOperatorBase> extends DualInputOperator {
-   
-   /**
-* An enumeration of hints, optionally usable to tell the system how 
exactly execute the join.
-*/
-   public static enum JoinHint {
--- End diff --

Moving the `JoinHint` enum breaks the public API. Join hints are quite 
often used and I would prefer to keep them at the same location.
What do you think about renaming *your* `AbstractJoinOperatorBase` class to 
`JoinOperatorBase` and *your* `JoinOperatorBase` class to 
`InnerJoinOperatorBase`?
Alternatively, we can also move the `JoinHint` enum from 
`AbstractJoinOperatorBase` to `JoinOperatorBase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-143250678
  
We have a couple of unit tests to check the correctness of the API, i.e., 
check that valid use is working and invalid use throws an early exceptions. See 
for example `org.apache.flink.api.java.operator.JoinOperatorTest`. It would be 
good to have such unit tests as well for outer joins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r40441316
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
 ---
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.List;
+
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class OuterJoinITCase extends MultipleProgramsTestBase {
+
+   public OuterJoinITCase(TestExecutionMode mode) {
+   super(mode);
+   }
+
+   @Test
+   public void testUDFLeftOuterJoinOnTuplesWithKeyFieldPositions() throws 
Exception {
+   /*
+* UDF Join on tuples with key field positions
+*/
+
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+   DataSet> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+   DataSet> joinDs =
+   ds1.leftOuterJoin(ds2)
+   .where(0)
+   .equalTo(0)
+   .with(new T3T5FlatJoin());
+
+   List> result = joinDs.collect();
+
+   String expected = "Hi,Hallo\n" +
+   "Hello,Hallo Welt\n" +
+   "Hello,Hallo Welt wie\n" +
+   "Hello world,null\n";
+
+   compareResultAsTuples(result, expected);
+   }
+
+   @Test
+   public void testUDFRightOuterJoinOnTuplesWithKeyFieldPositions() throws 
Exception {
+   /*
+* UDF Join on tuples with key field positions
+*/
+
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+   DataSet> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+   DataSet> joinDs =
+   ds1.rightOuterJoin(ds2)
+   .where(1)
+   .equalTo(1)
+   .with(new T3T5FlatJoin());
+
+   List> result = joinDs.collect();
+
+   String expected = "Hi,Hallo\n" +
+   "Hello,Hallo Welt\n" +
+   "null,Hallo Welt wie\n" +
+   "Hello world,Hallo Welt\n";
+
+   compareResultAsTuples(result, expected);
+   }
+
+ 

[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-143252740
  
As I said in my previous comments, I would prefer to support projection 
joins initially.
Instead, it would be good, if we could ensure by API design that an outer 
join is always completed with an `with(JoinFunction)` call. One way to do this 
would be to return a special `OuterJoinOperatorSetsPredicate` object when 
`.where()` is called. This `OuterJoinOperatorSetsPredicate` does not return a 
`JoinOperator` when `equalTo()` is called but an unfinished outer join that 
only allows to call `with()`. That way, the regular join API would remain 
stable. Or do you have a better idea to model the API in the right way?

If we don't allow DefaultJoin and ProjectJoin for outer joins, we can also 
revert the corresponding changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-09-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-143253277
  
Hi @jkovacs and @r-pogalz,

really good work! I left a few comments inline but overall the PR is in a 
pretty good shape. Please let me know, if you have questions or would like to 
discuss some of the comments I made.

Have a good weekend, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-10-02 Thread jkovacs
Github user jkovacs commented on the pull request:

https://github.com/apache/flink/pull/1138#issuecomment-144997953
  
Thanks @fhueske and @StephanEwen for the comprehensive review and 
additional details on Flink internals!, I agree that we should rather wait to 
implement the projection join correctly at a later point.
I'll append a few commits addressing the review comments and squash them 
later into the appropriate commits when you feel it's ready to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...

2015-10-02 Thread jkovacs
Github user jkovacs commented on a diff in the pull request:

https://github.com/apache/flink/pull/1138#discussion_r41014523
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
 ---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.commons.collections.ResettableIterator;
+import org.apache.commons.collections.iterators.ListIteratorWrapper;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OuterJoinOperatorBase> extends AbstractJoinOperatorBase {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private OuterJoinType outerJoinType;
+
+   public OuterJoinOperatorBase(UserCodeWrapper udf, 
BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(new UserCodeObjectWrapper(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinOperatorBase(Class udf, 
BinaryOperatorInformation operatorInfo,
+   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+   super(new UserCodeClassWrapper(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+   this.outerJoinType = outerJoinType;
+   }
+
+   public void setOuterJoinType(OuterJoinType outerJoinType) {
+   this.outerJoinType = outerJoinType;
+   }
+
+   public OuterJoinType getOuterJoinType() {
+   return outerJoinType;
+   }
+
+   @Override
+   protected List executeOnCollections(List leftInput, List 
rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) 
throws Exception {
+   TypeInformation leftInformation = 
getOperatorInfo().getFirstInputType();
+   TypeInformation rightInformation = 
getOperatorInfo().getSecondInputType();
+   TypeInformation outInformation = 
getOperatorInfo().getOutputType();
+
+   TypeComparator leftComparator = buildComparatorFor(0, 
executionConfig, leftInformation);
+   TypeComparator rightComparator = buildComparatorFor(1, 
executionConfig, rightInformation);
+
+   TypeSerializer leftSerializer = 
left