Updated Branches:
  refs/heads/master 72caac7d8 -> 1e06362b9

CRUNCH-121: Fix bug in full outer and left outer joins. Contributed by John 
Jensen.

Signed-off-by: Josh Wills <[email protected]>
Signed-off-by: Kiyan Ahmadizadeh <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1e06362b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1e06362b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1e06362b

Branch: refs/heads/master
Commit: 1e06362b9b8ff43d977d166fc4916e41cbeb3ae3
Parents: 72caac7
Author: John Jensen <[email protected]>
Authored: Mon Nov 26 20:05:03 2012 -0800
Committer: Kiyan Ahmadizadeh <[email protected]>
Committed: Tue Nov 27 16:38:14 2012 -0800

----------------------------------------------------------------------
 .../apache/crunch/lib/join/FullOuterJoinFn.java    |    2 +-
 .../apache/crunch/lib/join/LeftOuterJoinFn.java    |    2 +-
 .../lib/join/BrokenLeftAndOuterJoinTest.java       |   90 +++++++++++++++
 3 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1e06362b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java 
b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
index 834396a..c0ce727 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
@@ -56,7 +56,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> 
{
   public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, 
Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
       // Make sure that left side gets emitted.
-      if (0 == lastId && 0 == id) {
+      if (0 == lastId) {
         for (U u : leftValues) {
           emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1e06362b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java 
b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
index 18288a4..731c496 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -56,7 +56,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> 
{
   public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, 
Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
       // Make sure that left side always gets emitted.
-      if (0 == lastId && 0 == id) {
+      if (0 == lastId) {
         for (U u : leftValues) {
           emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1e06362b/crunch/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java
 
b/crunch/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java
new file mode 100644
index 0000000..7e2e444
--- /dev/null
+++ 
b/crunch/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class BrokenLeftAndOuterJoinTest {
+
+  List<Pair<StringWrapper, String>> createValuePairList(StringWrapper 
leftValue, String rightValue) {
+    Pair<StringWrapper, String> valuePair = Pair.of(leftValue, rightValue);
+    List<Pair<StringWrapper, String>> valuePairList = Lists.newArrayList();
+    valuePairList.add(valuePair);
+    return valuePairList;
+  }
+  
+  @Test
+  public void testOuterJoin() {
+    JoinFn<StringWrapper, StringWrapper, String> joinFn = new 
LeftOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+    joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration()));
+    joinFn.initialize();
+    Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter = 
mock(Emitter.class);
+    
+    StringWrapper key = new StringWrapper();
+    StringWrapper leftValue = new StringWrapper();
+    key.setValue("left-only");
+    leftValue.setValue("left-only-left");
+    joinFn.join(key, 0, createValuePairList(leftValue, null), emitter);
+
+    key.setValue("right-only");
+    joinFn.join(key, 1, createValuePairList(null, "right-only-right"), 
emitter);
+
+    verify(emitter).emit(Pair.of(wrap("left-only"), 
Pair.of(wrap("left-only-left"), (String) null)));
+    verifyNoMoreInteractions(emitter);
+  }
+  
+  @Test
+  public void testFullJoin() {
+    JoinFn<StringWrapper, StringWrapper, String> joinFn = new 
FullOuterJoinFn<StringWrapper, StringWrapper, String>(
+        Avros.reflects(StringWrapper.class),
+        Avros.reflects(StringWrapper.class));
+    joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration()));
+    joinFn.initialize();
+    Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter = 
mock(Emitter.class);
+    
+    StringWrapper key = new StringWrapper();
+    StringWrapper leftValue = new StringWrapper();
+    key.setValue("left-only");
+    leftValue.setValue("left-only-left");
+    joinFn.join(key, 0, createValuePairList(leftValue, null), emitter);
+
+    key.setValue("right-only");
+    joinFn.join(key, 1, createValuePairList(null, "right-only-right"), 
emitter);
+
+    verify(emitter).emit(Pair.of(wrap("left-only"), 
Pair.of(wrap("left-only-left"), (String) null)));
+    verify(emitter).emit(Pair.of(wrap("right-only"), 
Pair.of((StringWrapper)null, "right-only-right")));
+    verifyNoMoreInteractions(emitter);
+  }
+}

Reply via email to