Repository: beam Updated Branches: refs/heads/master 2f9428c3e -> f1386c1cb
[BEAM-2378] support FULL OUTER JOIN Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5918b2f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5918b2f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5918b2f Branch: refs/heads/master Commit: c5918b2f7ce36c755e2a285c42cc6b628b9ee319 Parents: 2f9428c Author: James Xu <xumingmi...@gmail.com> Authored: Wed May 31 10:28:55 2017 +0800 Committer: Davor Bonaci <da...@google.com> Committed: Fri Jun 2 10:00:57 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/extensions/joinlibrary/Join.java | 65 ++++++- .../joinlibrary/OuterFullJoinTest.java | 179 +++++++++++++++++++ 2 files changed, 243 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c5918b2f/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java index f4e6ccb..9acb048 100644 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -141,7 +141,7 @@ public class Join { * @param <V1> Type of the values for the left collection. * @param <V2> Type of the values for the right collection. * @return A joined collection of KV where Key is the key and value is a - * KV where Key is of type V1 and Value is type V2. Keys that + * KV where Key is of type V1 and Value is type V2. Values that * should be null or empty is replaced with nullValue. */ public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin( @@ -184,4 +184,67 @@ public class Join { KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), ((KvCoder) rightCollection.getCoder()).getValueCoder()))); } + + /** + * Full Outer Join of two collections of KV elements. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param leftNullValue Value to use as null value when left side do not match right side. + * @param rightNullValue Value to use as null value when right side do not match right side. + * @param <K> Type of the key for both collections + * @param <V1> Type of the values for the left collection. + * @param <V2> Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a + * KV where Key is of type V1 and Value is type V2. Values that + * should be null or empty is replaced with leftNullValue/rightNullValue. + */ + public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> fullOuterJoin( + final PCollection<KV<K, V1>> leftCollection, + final PCollection<KV<K, V2>> rightCollection, + final V1 leftNullValue, final V2 rightNullValue) { + checkNotNull(leftCollection); + checkNotNull(rightCollection); + checkNotNull(leftNullValue); + checkNotNull(rightNullValue); + + final TupleTag<V1> v1Tuple = new TupleTag<>(); + final TupleTag<V2> v2Tuple = new TupleTag<>(); + + PCollection<KV<K, CoGbkResult>> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply(CoGroupByKey.<K>create()); + + return coGbkResultCollection.apply(ParDo.of( + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV<K, CoGbkResult> e = c.element(); + + Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); + if (leftValuesIterable.iterator().hasNext() + && rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } + } else if (leftValuesIterable.iterator().hasNext() + && !rightValuesIterable.iterator().hasNext()) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightNullValue))); + } + } else if (!leftValuesIterable.iterator().hasNext() + && rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftNullValue, rightValue))); + } + } + } + })) + .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c5918b2f/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java new file mode 100644 index 0000000..cdf4f4f --- /dev/null +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterFullJoinTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.joinlibrary; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * This test Outer Full Join functionality. + */ +public class OuterFullJoinTest { + + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Before + public void setup() { + + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin( + leftCollection, rightCollection, -1L, ""); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin( + leftCollection, rightCollection, -1L, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin( + leftCollection, rightCollection, -1L, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinNoneToNoneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.fullOuterJoin( + leftCollection, rightCollection, -1L, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, ""))); + expectedResult.add(KV.of("Key3", KV.of(-1L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + p.enableAbandonedNodeEnforcement(false); + Join.fullOuterJoin( + null, + p.apply( + Create.of(listRightOfKv) + .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), + "", ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + p.enableAbandonedNodeEnforcement(false); + Join.fullOuterJoin( + p.apply( + Create.of(leftListOfKv).withCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))), + null, + -1L, -1L); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftNullValueIsNull() { + p.enableAbandonedNodeEnforcement(false); + Join.fullOuterJoin( + p.apply("CreateLeft", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))), + p.apply( + "CreateRight", Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), + null, ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightNullValueIsNull() { + p.enableAbandonedNodeEnforcement(false); + Join.fullOuterJoin( + p.apply("CreateLeft", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))), + p.apply( + "CreateRight", Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), + -1L, null); + } +}