Updated Branches: refs/heads/master 03f3ce800 -> b681953c5
Rename PTableUnionTest to PTableUnionIT, as per convention Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b681953c Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b681953c Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b681953c Branch: refs/heads/master Commit: b681953c56c4178b4d4d8f1d13c6d96953268ad8 Parents: 03f3ce8 Author: Josh Wills <[email protected]> Authored: Wed Jan 30 06:17:18 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Jan 30 06:17:18 2013 -0800 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/PTableUnionIT.java | 99 ++++++++++++++ .../it/java/org/apache/crunch/PTableUnionTest.java | 101 --------------- 2 files changed, 99 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b681953c/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java b/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java new file mode 100644 index 0000000..94c548f --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertNotNull; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + + + +public class PTableUnionIT { + + public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> { + + private static final long serialVersionUID = 5517897875971194220L; + + @Override + public void process(String input, Emitter<Pair<String, String>> emitter) { + if (input.length() > 0) { + emitter.emit(Pair.of(input.substring(0, 1), input)); + } + } + } + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + protected MRPipeline pipeline; + + @Before + public void setUp() { + pipeline = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()); + } + + @After + public void tearDown() { + pipeline.done(); + } + + @Test + public void tableUnionMaterializeNPE() throws Exception { + PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt")); + PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt")); + lorum.materialize(); + + PTable<String, String> wordsByFirstLetter = + words.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(), Avros.strings())); + PTable<String, String> lorumByFirstLetter = + lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(), Avros.strings())); + + @SuppressWarnings("unchecked") + PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter); + + assertNotNull(union.materialize().iterator().next()); + } + + @Test + public void collectionUnionMaterializeNPE() throws Exception { + PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt")); + PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt")); + lorum.materialize(); + + IdentityFn<String> identity = IdentityFn.getInstance(); + words = words.parallelDo(identity, Avros.strings()); + lorum = lorum.parallelDo(identity, Avros.strings()); + + @SuppressWarnings("unchecked") + PCollection<String> union = words.union(lorum); + + union.materialize().iterator(); + + assertNotNull(union.materialize().iterator().next()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b681953c/crunch/src/it/java/org/apache/crunch/PTableUnionTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/PTableUnionTest.java b/crunch/src/it/java/org/apache/crunch/PTableUnionTest.java deleted file mode 100644 index 1d31096..0000000 --- a/crunch/src/it/java/org/apache/crunch/PTableUnionTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import static org.junit.Assert.assertNotNull; - -import java.io.IOException; - -import org.apache.crunch.PCollection; -import org.apache.crunch.PTable; -import org.apache.crunch.fn.IdentityFn; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.test.TemporaryPath; -import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.avro.Avros; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - - - -public class PTableUnionTest { - - public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> { - - private static final long serialVersionUID = 5517897875971194220L; - - @Override - public void process(String input, Emitter<Pair<String, String>> emitter) { - if (input.length() > 0) { - emitter.emit(Pair.of(input.substring(0, 1), input)); - } - } - } - - @Rule - public TemporaryPath tmpDir = TemporaryPaths.create(); - - protected MRPipeline pipeline; - - @Before - public void setUp() { - pipeline = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()); - } - - @After - public void tearDown() { - pipeline.done(); - } - - @Test - public void tableUnionMaterializeNPE() throws Exception { - PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt")); - PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt")); - lorum.materialize(); - - PTable<String, String> wordsByFirstLetter = - words.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(), Avros.strings())); - PTable<String, String> lorumByFirstLetter = - lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(), Avros.strings())); - - @SuppressWarnings("unchecked") - PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter); - - assertNotNull(union.materialize().iterator().next()); - } - - @Test - public void collectionUnionMaterializeNPE() throws Exception { - PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt")); - PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt")); - lorum.materialize(); - - IdentityFn<String> identity = IdentityFn.getInstance(); - words = words.parallelDo(identity, Avros.strings()); - lorum = lorum.parallelDo(identity, Avros.strings()); - - @SuppressWarnings("unchecked") - PCollection<String> union = words.union(lorum); - - union.materialize().iterator(); - - assertNotNull(union.materialize().iterator().next()); - } -} \ No newline at end of file
