Updated Branches: refs/heads/master 0473d8e86 -> 3a3111546
Detach iterated join values Values being joined are typically re-used objects from a reducer's iterator, meaning storing them in a local collection does not have the desired behavior. The iterated values are now detached (i.e. deep copied) in joins to get around this. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/3a311154 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/3a311154 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/3a311154 Branch: refs/heads/master Commit: 3a3111546e788f4139fa291f018fe71bd3420d91 Parents: 0473d8e Author: Gabriel Reid <[email protected]> Authored: Fri Jul 6 23:35:43 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Fri Jul 6 23:35:43 2012 +0200 ---------------------------------------------------------------------- src/main/java/com/cloudera/crunch/lib/Join.java | 8 +++--- .../cloudera/crunch/lib/join/FullOuterJoinFn.java | 7 +++++- .../com/cloudera/crunch/lib/join/InnerJoinFn.java | 17 +++++++++----- .../java/com/cloudera/crunch/lib/join/JoinFn.java | 14 ++++++++++++ .../cloudera/crunch/lib/join/LeftOuterJoinFn.java | 7 +++++- .../cloudera/crunch/lib/join/RightOuterJoinFn.java | 9 ++++++- .../crunch/lib/join/FullOuterJoinTest.java | 5 ++- .../cloudera/crunch/lib/join/InnerJoinTest.java | 5 ++- .../com/cloudera/crunch/lib/join/JoinTester.java | 4 +- .../crunch/lib/join/LeftOuterJoinTest.java | 5 ++- .../crunch/lib/join/RightOuterJoinTest.java | 5 ++- 11 files changed, 62 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/Join.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/Join.java b/src/main/java/com/cloudera/crunch/lib/Join.java index ca68aec..0871dc1 100644 --- a/src/main/java/com/cloudera/crunch/lib/Join.java +++ b/src/main/java/com/cloudera/crunch/lib/Join.java @@ -59,7 +59,7 @@ public class Join { * @return The joined result. */ public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) { - return join(left, right, new InnerJoinFn<K, U, V>()); + return join(left, right, new InnerJoinFn<K, U, V>(left.getValueType())); } /** @@ -75,7 +75,7 @@ public class Join { * @return The joined result. */ public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) { - return join(left, right, new LeftOuterJoinFn<K, U, V>()); + return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getValueType())); } /** @@ -91,7 +91,7 @@ public class Join { * @return The joined result. */ public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) { - return join(left, right, new RightOuterJoinFn<K, U, V>()); + return join(left, right, new RightOuterJoinFn<K, U, V>(left.getValueType())); } /** @@ -106,7 +106,7 @@ public class Join { * @return The joined result. */ public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) { - return join(left, right, new FullOuterJoinFn<K, U, V>()); + return join(left, right, new FullOuterJoinFn<K, U, V>(left.getValueType())); } public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java index 80728a7..3d8888e 100644 --- a/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java +++ b/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java @@ -18,6 +18,7 @@ import java.util.List; import com.cloudera.crunch.Emitter; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PType; import com.google.common.collect.Lists; /** @@ -33,6 +34,10 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { private transient K lastKey; private transient List<U> leftValues; + public FullOuterJoinFn(PType<U> leftValueType) { + super(leftValueType); + } + /** {@inheritDoc} */ @Override public void initialize() { @@ -58,7 +63,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { if (id == 0) { for (Pair<U, V> pair : pairs) { if (pair.first() != null) - leftValues.add(pair.first()); + leftValues.add(leftValueType.getDetachedValue(pair.first())); } } else { for (Pair<U, V> pair : pairs) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java index e5013e5..0d7230a 100644 --- a/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java +++ b/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java @@ -18,6 +18,7 @@ import java.util.List; import com.cloudera.crunch.Emitter; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PType; import com.google.common.collect.Lists; /** @@ -28,15 +29,19 @@ import com.google.common.collect.Lists; * @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values */ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> { - + private transient K lastKey; - private transient List<U> LeftValues; + private transient List<U> leftValues; + + public InnerJoinFn(PType<U> leftValueType) { + super(leftValueType); + } /** {@inheritDoc} */ @Override public void initialize() { lastKey = null; - this.LeftValues = Lists.newArrayList(); + this.leftValues = Lists.newArrayList(); } /** {@inheritDoc} */ @@ -45,16 +50,16 @@ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> { Emitter<Pair<K, Pair<U, V>>> emitter) { if (!key.equals(lastKey)) { lastKey = key; - LeftValues.clear(); + leftValues.clear(); } if (id == 0) { // from left for (Pair<U, V> pair : pairs) { if (pair.first() != null) - LeftValues.add(pair.first()); + leftValues.add(leftValueType.getDetachedValue(pair.first())); } } else { // from right for (Pair<U, V> pair : pairs) { - for (U u : LeftValues) { + for (U u : leftValues) { emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second()))); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java index ee3d293..d1305d4 100644 --- a/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java +++ b/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java @@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join; import com.cloudera.crunch.DoFn; import com.cloudera.crunch.Emitter; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PType; /** * Represents a {@link com.cloudera.crunch.DoFn} for performing joins. @@ -28,6 +29,19 @@ import com.cloudera.crunch.Pair; public abstract class JoinFn<K, U, V> extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> { + protected PType<U> leftValueType; + + /** + * Instantiate with the PType of the value of the left side of the join (used + * for creating deep copies of values). + * + * @param leftValueType + * The PType of the value type of the left side of the join + */ + public JoinFn(PType<U> leftValueType) { + this.leftValueType = leftValueType; + } + /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */ public abstract String getJoinType(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java index 9927964..c121889 100644 --- a/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java +++ b/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java @@ -18,6 +18,7 @@ import java.util.List; import com.cloudera.crunch.Emitter; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PType; import com.google.common.collect.Lists; /** @@ -33,6 +34,10 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { private transient K lastKey; private transient List<U> leftValues; + public LeftOuterJoinFn(PType<U> leftValueType) { + super(leftValueType); + } + /** {@inheritDoc} */ @Override public void initialize() { @@ -58,7 +63,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { if (id == 0) { for (Pair<U, V> pair : pairs) { if (pair.first() != null) - leftValues.add(pair.first()); + leftValues.add(leftValueType.getDetachedValue(pair.first())); } } else { for (Pair<U, V> pair : pairs) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java index 99c046f..1824f77 100644 --- a/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java +++ b/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java @@ -18,6 +18,7 @@ import java.util.List; import com.cloudera.crunch.Emitter; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PType; import com.google.common.collect.Lists; /** @@ -28,10 +29,14 @@ import com.google.common.collect.Lists; * @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values */ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { - + private transient K lastKey; private transient List<U> leftValues; + public RightOuterJoinFn(PType<U> leftValueType) { + super(leftValueType); + } + /** {@inheritDoc} */ @Override public void initialize() { @@ -50,7 +55,7 @@ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { if (id == 0) { for (Pair<U, V> pair : pairs) { if (pair.first() != null) - leftValues.add(pair.first()); + leftValues.add(leftValueType.getDetachedValue(pair.first())); } } else { for (Pair<U, V> pair : pairs) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java index ef80cd4..88b8225 100644 --- a/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java +++ b/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java @@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join; import static org.junit.Assert.assertTrue; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PTypeFamily; public class FullOuterJoinTest extends JoinTester { @Override @@ -41,7 +42,7 @@ public class FullOuterJoinTest extends JoinTester { } @Override - protected JoinFn<String, Long, Long> getJoinFn() { - return new FullOuterJoinFn<String, Long, Long>(); + protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) { + return new FullOuterJoinFn<String, Long, Long>(typeFamily.longs()); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java index 4bac0c9..b42e51e 100644 --- a/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java +++ b/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java @@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join; import static org.junit.Assert.assertTrue; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PTypeFamily; public class InnerJoinTest extends JoinTester { @Override @@ -41,7 +42,7 @@ public class InnerJoinTest extends JoinTester { } @Override - protected JoinFn<String, Long, Long> getJoinFn() { - return new InnerJoinFn<String, Long, Long>(); + protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) { + return new InnerJoinFn<String, Long, Long>(typeFamily.longs()); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java b/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java index 651747d..6208cb4 100644 --- a/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java +++ b/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java @@ -50,7 +50,7 @@ public abstract class JoinTester implements Serializable { PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings())); PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings())); - PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn()); + PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf)); PTable<String, Long> sums = join.parallelDo("cnt", new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() { @@ -100,5 +100,5 @@ public abstract class JoinTester implements Serializable { /** * @return The JoinFn to use. */ - protected abstract JoinFn<String, Long, Long> getJoinFn(); + protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java index 9517f70..0ad4490 100644 --- a/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java +++ b/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java @@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join; import static org.junit.Assert.assertTrue; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PTypeFamily; public class LeftOuterJoinTest extends JoinTester { @Override @@ -41,7 +42,7 @@ public class LeftOuterJoinTest extends JoinTester { } @Override - protected JoinFn<String, Long, Long> getJoinFn() { - return new LeftOuterJoinFn<String, Long, Long>(); + protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) { + return new LeftOuterJoinFn<String, Long, Long>(typeFamily.longs()); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3a311154/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java index 54467e8..1daed57 100644 --- a/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java +++ b/src/test/java/com/cloudera/crunch/lib/join/RightOuterJoinTest.java @@ -17,6 +17,7 @@ package com.cloudera.crunch.lib.join; import static org.junit.Assert.assertTrue; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PTypeFamily; public class RightOuterJoinTest extends JoinTester { @Override @@ -41,7 +42,7 @@ public class RightOuterJoinTest extends JoinTester { } @Override - protected JoinFn<String, Long, Long> getJoinFn() { - return new RightOuterJoinFn<String, Long, Long>(); + protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) { + return new RightOuterJoinFn<String, Long, Long>(typeFamily.longs()); } }
