[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user r-pogalz commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r41505034 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java --- @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import org.apache.commons.collections.ResettableIterator; +import org.apache.commons.collections.iterators.ListIteratorWrapper; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator; +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OuterJoinOperatorBase> extends AbstractJoinOperatorBase { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private OuterJoinType outerJoinType; + + public OuterJoinOperatorBase(UserCodeWrapper udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(udf, operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeObjectWrapper(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(Class udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeClassWrapper(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public void setOuterJoinType(OuterJoinType outerJoinType) { + this.outerJoinType = outerJoinType; + } + + public OuterJoinType getOuterJoinType() { + return outerJoinType; + } + + @Override + protected List executeOnCollections(List leftInput, List rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception { + TypeInformation leftInformation = getOperatorInfo().getFirstInputType(); + TypeInformation rightInformation = getOperatorInfo().getSecondInputType(); + TypeInformation outInformation = getOperatorInfo().getOutputType(); + + TypeComparator leftComparator = buildComparatorFor(0, executionConfig, leftInformation); + TypeComparator rightComparator = buildComparatorFor(1, executionConfig, rightInformation); + + TypeSerializer leftSerializer = lef
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-146790205 Thanks for the update @jkovacs and @r-pogalz. Very good work! I will go ahead, try this PR, and merge it :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-146835992 Looks good. I added one commit to restore binary compatibility. The code is not super nice, but it allows to runt programs which have been previously compiled without the need to recompile. We can still clean up the code later if we decide to do so. Final tests are running, will merge after they passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1138 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-146886307 Thank you very much @jkovacs and @r-pogalz for adding outer joins to Flink! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-146885685 Oh, just realized we did not update the documentation. I will open a JIRA for that and add it later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-14654 @jkovacs and @r-pogalz, thank you very much for this PR and the detailed description! It's quite a bit of code so it will take some time to be reviewed. I hope to give feedback soon. Nonetheless, we can start a discussion about the handling of projection for outer joins. By changing the type information to `GenericTypeInfo` to support tuples with null values, a `DataSet` cannot be used (in a join, groupBy, reduce, ...) as before because the runtime will use completely different serializers and comparators. Therefore, I am more in favor of not supporting projection for outer joins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user jkovacs commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-141497981 Thanks @fhueske, that's a good point I haven't considered. Another idea that occurred to me was to convert the result tuple types to `GenericTypeInfo` (instead of `GenericTypeInfo`), where `T` is the original type of the tuple field (e.g. `String` or `Integer`). This would be null safe _and_ would allow the user to group by those fields, assuming of course they are sure that the fields are non-null (e.g. on a left or right outer join). Although I'm not sure of all the consequences of using, say, `GenericTypeInfo` instead of `BasicTypeInfo` for serialization and comparison. I pushed this change as https://github.com/jkovacs/flink/commit/f682baa50137e0a54bae091ba60ba85fdb8f4c1b to a different branch to test it Also rebased branch onto current master and resolved conflicts (Failing test is some YARN integration test). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user jkovacs commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-141569673 To partly answer my own question: One big drawback of downgrading the tuple field types to `GenericTypeInfo` is that for (de)serialization and comparison the generic Kryo serializers will be used, which are significantly slower than the native flink serializers and comparators for basic types, such as Integer (according to [this blog post](http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html)). One obvious way to work around this is to only downgrade the fields that are actually nullable, and keep the original types of the definitely non-null fields (i.e. the types from the outer side of a left or right outer join). This way the user can still group/join/sort efficiently on the non-null fields, while preserving null safety for the other fields. I pushed another commit for this to my temporary branch for review, if this makes sense: https://github.com/jkovacs/flink/compare/feature/FLINK-2576...jkovacs:feature/FLINK-2576-projection-types As you can see I was really hoping to make the projection joins work properly :-) but if you feel that the effort isn't worth it or I'm missing something else entirely, we can for sure simply scrap that and throw an `InvalidProgramException` when the user tries to do a project outer join instead of defining his own join udf. Opinions on that are welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-142723198 Hi @jkovacs, thanks for all your efforts to make the projection work. Going for a `GenericeTypeInfo` would work in many cases but unfortunately not in all. For example `union` operates in Flink on serialization level and requires that all data sets which are unioned use the same serializer. By transparently using a `GenericTypeInfo` users might be surprised why `DataSet.union(DataSet)` does not work. If we only support OuterJoins with an explicit JoinFunction, the user has full control how to deal with null values and can even use a custom Tuple type or Tuple serializer (via `Operator.returns()`) that supports null values. In my opinion, the best approach is to only support OuterJoins with JoinFunctions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-142754122 Agreed with Fabian. For now, let's require join functions. Future work would be to use Tuples with Options in Scala. In Java, we should probably add an option type as well (and teach the TypeExtractor to use them). Java core adds an Option type only in Java 8, unfortunately. We could add one for Java 7 and deprecate it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40431134 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java --- @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import org.apache.commons.collections.ResettableIterator; +import org.apache.commons.collections.iterators.ListIteratorWrapper; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator; +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OuterJoinOperatorBase> extends AbstractJoinOperatorBase { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private OuterJoinType outerJoinType; + + public OuterJoinOperatorBase(UserCodeWrapper udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(udf, operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeObjectWrapper(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(Class udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeClassWrapper(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public void setOuterJoinType(OuterJoinType outerJoinType) { + this.outerJoinType = outerJoinType; + } + + public OuterJoinType getOuterJoinType() { + return outerJoinType; + } + + @Override + protected List executeOnCollections(List leftInput, List rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception { + TypeInformation leftInformation = getOperatorInfo().getFirstInputType(); + TypeInformation rightInformation = getOperatorInfo().getSecondInputType(); + TypeInformation outInformation = getOperatorInfo().getOutputType(); + + TypeComparator leftComparator = buildComparatorFor(0, executionConfig, leftInformation); + TypeComparator rightComparator = buildComparatorFor(1, executionConfig, rightInformation); + + TypeSerializer leftSerializer = left
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40431242 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java --- @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import org.apache.commons.collections.ResettableIterator; +import org.apache.commons.collections.iterators.ListIteratorWrapper; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator; +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OuterJoinOperatorBase> extends AbstractJoinOperatorBase { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private OuterJoinType outerJoinType; + + public OuterJoinOperatorBase(UserCodeWrapper udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(udf, operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeObjectWrapper(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(Class udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeClassWrapper(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public void setOuterJoinType(OuterJoinType outerJoinType) { + this.outerJoinType = outerJoinType; + } + + public OuterJoinType getOuterJoinType() { + return outerJoinType; + } + + @Override + protected List executeOnCollections(List leftInput, List rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception { + TypeInformation leftInformation = getOperatorInfo().getFirstInputType(); + TypeInformation rightInformation = getOperatorInfo().getSecondInputType(); + TypeInformation outInformation = getOperatorInfo().getOutputType(); + + TypeComparator leftComparator = buildComparatorFor(0, executionConfig, leftInformation); + TypeComparator rightComparator = buildComparatorFor(1, executionConfig, rightInformation); + + TypeSerializer leftSerializer = left
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40434251 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.base.AbstractJoinOperatorBase.JoinHint; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.AbstractJoinDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; +import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor; +import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor; +import org.apache.flink.optimizer.operators.SortMergeRightOuterJoinDescriptor; + +import java.util.ArrayList; +import java.util.List; + +public class OuterJoinNode extends TwoInputNode { + + private List dataProperties; + + /** +* Creates a new two input node for the optimizer plan, representing the given operator. +* +* @param operator The operator that the optimizer DAG node should represent. +*/ + public OuterJoinNode(OuterJoinOperatorBase operator) { + super(operator); + + this.dataProperties = getDataProperties(); + } + + private List getDataProperties() { + OuterJoinOperatorBase operator = getOperator(); + + OuterJoinType type = operator.getOuterJoinType(); + + JoinHint joinHint = operator.getJoinHint(); + joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint; + + List list = new ArrayList<>(); + switch (joinHint) { + case OPTIMIZER_CHOOSES: + list.add(getSortMergeDescriptor(type, true)); + list.add(getSortMergeDescriptor(type, false)); + break; + case REPARTITION_SORT_MERGE: + list.add(getSortMergeDescriptor(type, false)); + break; + case REPARTITION_HASH_FIRST: + case REPARTITION_HASH_SECOND: + case BROADCAST_HASH_FIRST: + case BROADCAST_HASH_SECOND: + default: + throw new CompilerException("Invalid join hint: " + joinHint + " for outer join type: " + type); + } + + Partitioner customPartitioner = operator.getCustomPartitioner(); + if (customPartitioner != null) { + for (OperatorDescriptorDual desc : list) { + ((AbstractJoinDescriptor) desc).setCustomPartitioner(customPartitioner); + } + } + return list; + } + + private OperatorDescriptorDual getSortMergeDescriptor(OuterJoinType type, boolean broadcastAllowed) { + if (type == OuterJoinType.FULL) { + return new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2); + } else if (type == OuterJoinType.LEFT) { + return new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed); + } else { + return new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed); + } + } + + @Override + public OuterJoinOperatorBase getOperator() { + return (OuterJoinOperatorBase) super.getOperator(); + } + + @O
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40434718 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.base.AbstractJoinOperatorBase.JoinHint; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.operators.AbstractJoinDescriptor; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; +import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor; +import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor; +import org.apache.flink.optimizer.operators.SortMergeRightOuterJoinDescriptor; + +import java.util.ArrayList; +import java.util.List; + +public class OuterJoinNode extends TwoInputNode { + + private List dataProperties; + + /** +* Creates a new two input node for the optimizer plan, representing the given operator. +* +* @param operator The operator that the optimizer DAG node should represent. +*/ + public OuterJoinNode(OuterJoinOperatorBase operator) { + super(operator); + + this.dataProperties = getDataProperties(); + } + + private List getDataProperties() { + OuterJoinOperatorBase operator = getOperator(); + + OuterJoinType type = operator.getOuterJoinType(); + + JoinHint joinHint = operator.getJoinHint(); + joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint; + + List list = new ArrayList<>(); + switch (joinHint) { + case OPTIMIZER_CHOOSES: + list.add(getSortMergeDescriptor(type, true)); + list.add(getSortMergeDescriptor(type, false)); --- End diff -- For `OuterJoinType.FULL`, this will add the same descriptor a second time. This increases the number of enumerated plans and should be avoided. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40434856 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java --- @@ -22,15 +22,16 @@ import java.util.Collections; import java.util.List; +import org.apache.flink.api.common.operators.base.AbstractJoinOperatorBase; import org.apache.flink.api.common.operators.base.JoinOperatorBase; -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.common.operators.base.AbstractJoinOperatorBase.JoinHint; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties; import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties; import org.apache.flink.optimizer.operators.OperatorDescriptorDual; -import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor; --- End diff -- You completely remove the `MatchNode`. It is not referenced in the source code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40439495 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java --- @@ -281,233 +315,225 @@ protected boolean udfWithForwardedFieldsSecondAnnotation(Class udfClass) { } @Override - protected JoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { + protected AbstractJoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { + String name = getName() != null ? getName() : "Join at " + joinLocationName; + + JoinOperatorBaseBuilder builder = new JoinOperatorBaseBuilder(name, joinType) + .withParallelism(getParallelism()) + .withPartitioner(getPartitioner()) + .withJoinHint(getJoinHint()) + .withResultType(getResultType()); + + final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys; + if (requiresTupleUnwrapping) { + if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { + // Both join sides have a key selector function, so we need to do the + // tuple wrapping/unwrapping on both sides. + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys2 = (Keys.SelectorFunctionKeys) keys2; + + builder = builder + .withUdf(new TupleUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else if (keys2 instanceof Keys.SelectorFunctionKeys) { + // The right side of the join needs the tuple wrapping/unwrapping + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys2 = (Keys.SelectorFunctionKeys) keys2; + + builder = builder + .withUdf(new TupleRightUnwrappingJoiner<>(function)) + .withInput1(input1, getInput1Type(), keys1) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else { + // The left side of the join needs the tuple wrapping/unwrapping - String name = getName() != null ? getName() : "Join at "+joinLocationName; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; - final JoinOperatorBase translated; - - if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { - // Both join sides have a key selector function, so we need to do the - // tuple wrapping/unwrapping on both sides. + builder = builder + .withUdf(new TupleLeftUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withInput2(input2, getInput2Type(), keys2); + } + } else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { + // Neither side needs the tuple wrapping/unwrapping - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys selectorKeys
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40439571 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java --- @@ -281,233 +315,225 @@ protected boolean udfWithForwardedFieldsSecondAnnotation(Class udfClass) { } @Override - protected JoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { + protected AbstractJoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { + String name = getName() != null ? getName() : "Join at " + joinLocationName; + + JoinOperatorBaseBuilder builder = new JoinOperatorBaseBuilder(name, joinType) + .withParallelism(getParallelism()) + .withPartitioner(getPartitioner()) + .withJoinHint(getJoinHint()) + .withResultType(getResultType()); + + final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys; + if (requiresTupleUnwrapping) { + if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { + // Both join sides have a key selector function, so we need to do the + // tuple wrapping/unwrapping on both sides. + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys2 = (Keys.SelectorFunctionKeys) keys2; + + builder = builder + .withUdf(new TupleUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else if (keys2 instanceof Keys.SelectorFunctionKeys) { + // The right side of the join needs the tuple wrapping/unwrapping + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys2 = (Keys.SelectorFunctionKeys) keys2; + + builder = builder + .withUdf(new TupleRightUnwrappingJoiner<>(function)) + .withInput1(input1, getInput1Type(), keys1) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else { + // The left side of the join needs the tuple wrapping/unwrapping - String name = getName() != null ? getName() : "Join at "+joinLocationName; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; - final JoinOperatorBase translated; - - if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { - // Both join sides have a key selector function, so we need to do the - // tuple wrapping/unwrapping on both sides. + builder = builder + .withUdf(new TupleLeftUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withInput2(input2, getInput2Type(), keys2); + } + } else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { + // Neither side needs the tuple wrapping/unwrapping - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys selectorKeys
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40439511 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java --- @@ -281,233 +315,225 @@ protected boolean udfWithForwardedFieldsSecondAnnotation(Class udfClass) { } @Override - protected JoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { + protected AbstractJoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { + String name = getName() != null ? getName() : "Join at " + joinLocationName; + + JoinOperatorBaseBuilder builder = new JoinOperatorBaseBuilder(name, joinType) + .withParallelism(getParallelism()) + .withPartitioner(getPartitioner()) + .withJoinHint(getJoinHint()) + .withResultType(getResultType()); + + final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys; + if (requiresTupleUnwrapping) { + if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { + // Both join sides have a key selector function, so we need to do the + // tuple wrapping/unwrapping on both sides. + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys2 = (Keys.SelectorFunctionKeys) keys2; + + builder = builder + .withUdf(new TupleUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else if (keys2 instanceof Keys.SelectorFunctionKeys) { + // The right side of the join needs the tuple wrapping/unwrapping + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys2 = (Keys.SelectorFunctionKeys) keys2; + + builder = builder + .withUdf(new TupleRightUnwrappingJoiner<>(function)) + .withInput1(input1, getInput1Type(), keys1) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else { + // The left side of the join needs the tuple wrapping/unwrapping - String name = getName() != null ? getName() : "Join at "+joinLocationName; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; - final JoinOperatorBase translated; - - if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { - // Both join sides have a key selector function, so we need to do the - // tuple wrapping/unwrapping on both sides. + builder = builder + .withUdf(new TupleLeftUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withInput2(input2, getInput2Type(), keys2); + } + } else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { + // Neither side needs the tuple wrapping/unwrapping - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys selectorKeys
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40439650 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java --- @@ -281,233 +315,225 @@ protected boolean udfWithForwardedFieldsSecondAnnotation(Class udfClass) { } @Override - protected JoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { + protected AbstractJoinOperatorBase translateToDataFlow(Operator input1, Operator input2) { + String name = getName() != null ? getName() : "Join at " + joinLocationName; + + JoinOperatorBaseBuilder builder = new JoinOperatorBaseBuilder(name, joinType) + .withParallelism(getParallelism()) + .withPartitioner(getPartitioner()) + .withJoinHint(getJoinHint()) + .withResultType(getResultType()); + + final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys; + if (requiresTupleUnwrapping) { + if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { + // Both join sides have a key selector function, so we need to do the + // tuple wrapping/unwrapping on both sides. + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys2 = (Keys.SelectorFunctionKeys) keys2; + + builder = builder + .withUdf(new TupleUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else if (keys2 instanceof Keys.SelectorFunctionKeys) { + // The right side of the join needs the tuple wrapping/unwrapping + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys2 = (Keys.SelectorFunctionKeys) keys2; + + builder = builder + .withUdf(new TupleRightUnwrappingJoiner<>(function)) + .withInput1(input1, getInput1Type(), keys1) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else { + // The left side of the join needs the tuple wrapping/unwrapping - String name = getName() != null ? getName() : "Join at "+joinLocationName; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; - final JoinOperatorBase translated; - - if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { - // Both join sides have a key selector function, so we need to do the - // tuple wrapping/unwrapping on both sides. + builder = builder + .withUdf(new TupleLeftUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withInput2(input2, getInput2Type(), keys2); + } + } else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { + // Neither side needs the tuple wrapping/unwrapping - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys selectorKeys1 = (Keys.SelectorFunctionKeys) keys1; - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys selectorKeys
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40440502 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java --- @@ -47,99 +45,29 @@ /** * @see org.apache.flink.api.common.functions.FlatJoinFunction */ -public class JoinOperatorBase> extends DualInputOperator { - - /** -* An enumeration of hints, optionally usable to tell the system how exactly execute the join. -*/ - public static enum JoinHint { --- End diff -- Moving the `JoinHint` enum breaks the public API. Join hints are quite often used and I would prefer to keep them at the same location. What do you think about renaming *your* `AbstractJoinOperatorBase` class to `JoinOperatorBase` and *your* `JoinOperatorBase` class to `InnerJoinOperatorBase`? Alternatively, we can also move the `JoinHint` enum from `AbstractJoinOperatorBase` to `JoinOperatorBase`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-143250678 We have a couple of unit tests to check the correctness of the API, i.e., check that valid use is working and invalid use throws an early exceptions. See for example `org.apache.flink.api.java.operator.JoinOperatorTest`. It would be good to have such unit tests as well for outer joins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40441316 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java --- @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.RichFlatJoinFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; +import java.util.List; + +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class OuterJoinITCase extends MultipleProgramsTestBase { + + public OuterJoinITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testUDFLeftOuterJoinOnTuplesWithKeyFieldPositions() throws Exception { + /* +* UDF Join on tuples with key field positions +*/ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet> joinDs = + ds1.leftOuterJoin(ds2) + .where(0) + .equalTo(0) + .with(new T3T5FlatJoin()); + + List> result = joinDs.collect(); + + String expected = "Hi,Hallo\n" + + "Hello,Hallo Welt\n" + + "Hello,Hallo Welt wie\n" + + "Hello world,null\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testUDFRightOuterJoinOnTuplesWithKeyFieldPositions() throws Exception { + /* +* UDF Join on tuples with key field positions +*/ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet> joinDs = + ds1.rightOuterJoin(ds2) + .where(1) + .equalTo(1) + .with(new T3T5FlatJoin()); + + List> result = joinDs.collect(); + + String expected = "Hi,Hallo\n" + + "Hello,Hallo Welt\n" + + "null,Hallo Welt wie\n" + + "Hello world,Hallo Welt\n"; + + compareResultAsTuples(result, expected); + } + +
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-143252740 As I said in my previous comments, I would prefer to support projection joins initially. Instead, it would be good, if we could ensure by API design that an outer join is always completed with an `with(JoinFunction)` call. One way to do this would be to return a special `OuterJoinOperatorSetsPredicate` object when `.where()` is called. This `OuterJoinOperatorSetsPredicate` does not return a `JoinOperator` when `equalTo()` is called but an unfinished outer join that only allows to call `with()`. That way, the regular join API would remain stable. Or do you have a better idea to model the API in the right way? If we don't allow DefaultJoin and ProjectJoin for outer joins, we can also revert the corresponding changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-143253277 Hi @jkovacs and @r-pogalz, really good work! I left a few comments inline but overall the PR is in a pretty good shape. Please let me know, if you have questions or would like to discuss some of the comments I made. Have a good weekend, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user jkovacs commented on the pull request: https://github.com/apache/flink/pull/1138#issuecomment-144997953 Thanks @fhueske and @StephanEwen for the comprehensive review and additional details on Flink internals!, I agree that we should rather wait to implement the projection join correctly at a later point. I'll append a few commits addressing the review comments and squash them later into the appropriate commits when you feel it's ready to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2576] Add Outer Join operator to Optimi...
Github user jkovacs commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r41014523 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java --- @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import org.apache.commons.collections.ResettableIterator; +import org.apache.commons.collections.iterators.ListIteratorWrapper; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator; +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OuterJoinOperatorBase> extends AbstractJoinOperatorBase { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private OuterJoinType outerJoinType; + + public OuterJoinOperatorBase(UserCodeWrapper udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(udf, operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeObjectWrapper(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(Class udf, BinaryOperatorInformation operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeClassWrapper(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public void setOuterJoinType(OuterJoinType outerJoinType) { + this.outerJoinType = outerJoinType; + } + + public OuterJoinType getOuterJoinType() { + return outerJoinType; + } + + @Override + protected List executeOnCollections(List leftInput, List rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception { + TypeInformation leftInformation = getOperatorInfo().getFirstInputType(); + TypeInformation rightInformation = getOperatorInfo().getSecondInputType(); + TypeInformation outInformation = getOperatorInfo().getOutputType(); + + TypeComparator leftComparator = buildComparatorFor(0, executionConfig, leftInformation); + TypeComparator rightComparator = buildComparatorFor(1, executionConfig, rightInformation); + + TypeSerializer leftSerializer = left