Updated Branches: refs/heads/master 0eb69c43b -> 69105d07b
CRUNCH-99: Handle byte[] and ByteBuffer in Avros.writables Also includes some additional header/config cleanup, and ensure unit test fails if ByteBuffer and byte array handling do not work correctly. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/69105d07 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/69105d07 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/69105d07 Branch: refs/heads/master Commit: 69105d07b5c5b36d132a3f00e9ac191067db808d Parents: 0eb69c4 Author: Josh Wills <[email protected]> Authored: Fri Oct 19 18:48:51 2012 -0700 Committer: Gabriel Reid <[email protected]> Committed: Wed Oct 24 16:00:35 2012 +0200 ---------------------------------------------------------------------- .../crunch/io/avro/AvroFileSourceTargetIT.java | 1 - .../org/apache/crunch/io/avro/AvroPipelineIT.java | 1 - .../org/apache/crunch/io/avro/AvroWritableIT.java | 89 +++++++++++++++ .../org/apache/crunch/types/PGroupedTableType.java | 11 ++- .../java/org/apache/crunch/types/avro/Avros.java | 11 +- 5 files changed, 104 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java index 5deacd1..671b920 100644 --- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java +++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java @@ -41,7 +41,6 @@ import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.avro.Avros; -import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java index e0a7ead..29bf4f5 100644 --- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java +++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; -import java.util.Collection; import java.util.List; import org.apache.avro.Schema; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java new file mode 100644 index 0000000..cbb7fde --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro; + +import static org.apache.crunch.types.avro.Avros.ints; +import static org.apache.crunch.types.avro.Avros.tableOf; +import static org.apache.crunch.types.avro.Avros.writables; +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.hadoop.io.DoubleWritable; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Maps; + +/** + * Verify handling of both a ByteBuffer and byte array as input from an Avro job (depending + * on the version of Avro being used). + */ +public class AvroWritableIT implements Serializable { + + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testAvroBasedWritablePipeline() throws Exception { + String customersInputPath = tmpDir.copyResourceFileName("customers.txt"); + Pipeline pipeline = new MRPipeline(AvroWritableIT.class, tmpDir.getDefaultConfiguration()); + pipeline.enableDebug(); + PCollection<String> customerLines = pipeline.readTextFile(customersInputPath); + Map<Integer, DoubleWritable> outputMap = customerLines.parallelDo( + new MapFn<String, Pair<Integer, DoubleWritable>>() { + @Override + public Pair<Integer, DoubleWritable> map(String input) { + int len = input.length(); + return Pair.of(len, new DoubleWritable(len)); + } + }, tableOf(ints(), writables(DoubleWritable.class))) + .groupByKey() + .combineValues(new CombineFn<Integer, DoubleWritable>() { + @Override + public void process(Pair<Integer, Iterable<DoubleWritable>> input, + Emitter<Pair<Integer, DoubleWritable>> emitter) { + double sum = 0.0; + for (DoubleWritable dw : input.second()) { + sum += dw.get(); + } + emitter.emit(Pair.of(input.first(), new DoubleWritable(sum))); + } + }) + .materializeToMap(); + + Map<Integer, DoubleWritable> expectedMap = Maps.newHashMap(); + expectedMap.put(17, new DoubleWritable(17.0)); + expectedMap.put(16, new DoubleWritable(16.0)); + expectedMap.put(12, new DoubleWritable(24.0)); + + assertEquals(expectedMap, outputMap); + + pipeline.done(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java index b4ac1e6..5718619 100644 --- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java @@ -25,6 +25,7 @@ import org.apache.crunch.MapFn; import org.apache.crunch.PGroupedTable; import org.apache.crunch.Pair; import org.apache.crunch.SourceTarget; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -74,9 +75,15 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable< } @Override + public void configure(Configuration conf) { + keys.configure(conf); + values.configure(conf); + } + + @Override public void initialize() { - keys.initialize(); - values.initialize(); + keys.setContext(getContext()); + values.setContext(getContext()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/69105d07/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java index 655ee55..c8a2ef5 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -234,7 +234,7 @@ public class Avros { return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema)); } - private static class BytesToWritableMapFn<T extends Writable> extends MapFn<ByteBuffer, T> { + private static class BytesToWritableMapFn<T extends Writable> extends MapFn<Object, T> { private static final Log LOG = LogFactory.getLog(BytesToWritableMapFn.class); private final Class<T> writableClazz; @@ -244,11 +244,12 @@ public class Avros { } @Override - public T map(ByteBuffer input) { - T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration()); + public T map(Object input) { + ByteBuffer byteBuffer = BYTES_IN.map(input); + T instance = ReflectionUtils.newInstance(writableClazz, null); try { - instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(), input - .limit()))); + instance.readFields(new DataInputStream(new ByteArrayInputStream(byteBuffer.array(), + byteBuffer.arrayOffset(), byteBuffer.limit()))); } catch (IOException e) { LOG.error("Exception thrown reading instance of: " + writableClazz, e); }
