Secondary sort integration tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/db0ce8e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/db0ce8e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/db0ce8e8 Branch: refs/heads/master Commit: db0ce8e8985c3d0d686898aa4182bdf1c004d7b5 Parents: 521cea2 Author: Josh Wills <[email protected]> Authored: Sun Oct 14 11:40:28 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Oct 21 05:07:55 2012 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/lib/SecondarySortIT.java | 65 +++++++++ crunch/src/it/resources/secondary_sort_input.txt | 7 + .../java/org/apache/crunch/lib/SecondarySort.java | 101 ++++----------- 3 files changed, 98 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java new file mode 100644 index 0000000..242f621 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/lib/SecondarySortIT.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.apache.crunch.types.avro.Avros.*; +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.CrunchTestSupport; +import org.junit.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; + + +public class SecondarySortIT extends CrunchTestSupport implements Serializable { + + @Test + public void testSecondarySort() throws Exception { + Pipeline p = new MRPipeline(SecondarySortIT.class, tempDir.getDefaultConfiguration()); + String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt"); + + PTable<String, Pair<Integer, Integer>> in = p.read(From.textFile(inputFile)) + .parallelDo(new MapFn<String, Pair<String, Pair<Integer, Integer>>>() { + @Override + public Pair<String, Pair<Integer, Integer>> map(String input) { + String[] pieces = input.split(","); + return Pair.of(pieces[0], + Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim()))); + } + }, tableOf(strings(), pairs(ints(), ints()))); + Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() { + @Override + public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) { + Joiner j = Joiner.on(','); + return j.join(input.first(), j.join(input.second())); + } + }, strings()).materialize(); + assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"), + ImmutableList.copyOf(lines)); + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/it/resources/secondary_sort_input.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/secondary_sort_input.txt b/crunch/src/it/resources/secondary_sort_input.txt new file mode 100644 index 0000000..3c7be93 --- /dev/null +++ b/crunch/src/it/resources/secondary_sort_input.txt @@ -0,0 +1,7 @@ +one,1,1 +one,2,-3 +two,4,5 +two,2,6 +two,1,7,9 +three,0,-1 +one,-5,10 http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/db0ce8e8/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java index 5a826fd..ebf7fb4 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java +++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java @@ -24,6 +24,7 @@ import org.apache.crunch.Emitter; import org.apache.crunch.GroupingOptions; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.lib.join.JoinUtils; @@ -32,40 +33,33 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.hadoop.conf.Configuration; -import com.google.common.collect.ImmutableList; - /** - * Utilities for performing a secondary sort on a PTable<K, Pair<V1, V2>> instance, i.e., sort on the - * key and then sort the values by V1. + * Utilities for performing a secondary sort on a {@code PTable<K, Pair<V1, V2>>} collection. */ public class SecondarySort { - - public static <K, V1, V2> PTable<K, Collection<Pair<V1, V2>>> sort(PTable<K, Pair<V1, V2>> input) { - PTypeFamily ptf = input.getTypeFamily(); - return sortAndApply(input, new SSUnpackFn<K, V1, V2>(), - ptf.tableOf(input.getKeyType(), ptf.collections(input.getValueType()))); - } + /** + * Perform a secondary sort on the given {@code PTable} instance and then apply a + * {@code DoFn} to the resulting sorted data to yield an output {@code PCollection<T>}. + */ public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) { - PTypeFamily ptf = input.getTypeFamily(); - PType<Pair<V1, V2>> valueType = input.getValueType(); - PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf( - ptf.pairs(input.getKeyType(), valueType.getSubTypes().get(0)), - valueType); - PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(), - ptf.collections(input.getValueType())); - return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter) - .groupByKey( - GroupingOptions.builder() - .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf)) - .partitionerClass(JoinUtils.getPartitionerClass(ptf)) - .build()) + return prepare(input) .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype); } + /** + * Perform a secondary sort on the given {@code PTable} instance and then apply a + * {@code DoFn} to the resulting sorted data to yield an output {@code PTable<U, V>}. + */ public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) { + return prepare(input) + .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, Pair<U, V>>(doFn), ptype); + } + + private static <K, V1, V2> PGroupedTable<Pair<K, V1>, Pair<V1, V2>> prepare( + PTable<K, Pair<V1, V2>> input) { PTypeFamily ptf = input.getTypeFamily(); PType<Pair<V1, V2>> valueType = input.getValueType(); PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf( @@ -78,10 +72,16 @@ public class SecondarySort { GroupingOptions.builder() .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf)) .partitionerClass(JoinUtils.getPartitionerClass(ptf)) - .build()) - .parallelDo("SecondarySort.apply", new SSTableWrapFn<K, V1, V2, U, V>(doFn), ptype); + .build()); } + private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> { + @Override + public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) { + return Pair.of(Pair.of(input.first(), input.second().first()), input.second()); + } + } + private static class SSWrapFn<K, V1, V2, T> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, T> { private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> intern; @@ -113,54 +113,5 @@ public class SecondarySort { public void cleanup(Emitter<T> emitter) { intern.cleanup(emitter); } - } - - private static class SSTableWrapFn<K, V1, V2, U, V> extends DoFn<Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>>, Pair<U, V>> { - private final DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern; - - public SSTableWrapFn(DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> intern) { - this.intern = intern; - } - - @Override - public void configure(Configuration conf) { - intern.configure(conf); - } - - @Override - public void setConfigurationForTest(Configuration conf) { - intern.setConfigurationForTest(conf); - } - - @Override - public void initialize() { - intern.setContext(getContext()); - } - - @Override - public void process(Pair<Pair<K, V1>, Iterable<Pair<V1, V2>>> input, Emitter<Pair<U, V>> emitter) { - intern.process(Pair.of(input.first().first(), input.second()), emitter); - } - - @Override - public void cleanup(Emitter<Pair<U, V>> emitter) { - intern.cleanup(emitter); - } - } - - private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> { - @Override - public Pair<Pair<K, V1>, Pair<V1, V2>> map(Pair<K, Pair<V1, V2>> input) { - return Pair.of(Pair.of(input.first(), input.second().first()), input.second()); - } - } - - private static class SSUnpackFn<K, V1, V2> extends - MapFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<K, Collection<Pair<V1, V2>>>> { - @Override - public Pair<K, Collection<Pair<V1, V2>>> map(Pair<K, Iterable<Pair<V1, V2>>> input) { - Collection<Pair<V1, V2>> c = ImmutableList.copyOf(input.second()); - return Pair.of(input.first(), c); - } - } + } }
