Updated Branches: refs/heads/master a69bda8f5 -> 7397d98a5
Add Ptype#getDetachedValue Add getDetachedValue to PType to allow creating deep copies of values in reducer-based DoFns. A side-effect of this is that PType now extends Serializable. Also fixes the bug in Aggregate#collectValues that caused the same value to be collected multiple times in the case of custom Writables or AvroTypes. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/7397d98a Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/7397d98a Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/7397d98a Branch: refs/heads/master Commit: 7397d98a59f64df3bf7fbda57e1ddcfdf37b9487 Parents: a69bda8 Author: Gabriel Reid <[email protected]> Authored: Tue Jul 3 19:52:00 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Tue Jul 3 22:10:07 2012 +0200 ---------------------------------------------------------------------- .../crunch/impl/mr/run/CrunchRuntimeException.java | 4 + .../java/com/cloudera/crunch/lib/Aggregate.java | 9 +- src/main/java/com/cloudera/crunch/lib/PTables.java | 55 ++++++- src/main/java/com/cloudera/crunch/types/PType.java | 36 +++- .../cloudera/crunch/types/avro/AvroDeepCopier.java | 134 +++++++++++++++ .../crunch/types/avro/AvroGroupedTableType.java | 6 + .../cloudera/crunch/types/avro/AvroTableType.java | 6 + .../com/cloudera/crunch/types/avro/AvroType.java | 28 +++- .../java/com/cloudera/crunch/types/avro/Avros.java | 4 + .../types/writable/WritableGroupedTableType.java | 6 + .../crunch/types/writable/WritableTableType.java | 6 + .../crunch/types/writable/WritableType.java | 17 ++- .../cloudera/crunch/types/writable/Writables.java | 40 ++++- .../com/cloudera/crunch/lib/AggregateTest.java | 104 +++++++++++ .../crunch/types/avro/AvroDeepCopierTest.java | 58 +++++++ .../types/avro/AvroGroupedTableTypeTest.java | 41 +++++ .../crunch/types/avro/AvroTableTypeTest.java | 35 ++++ .../cloudera/crunch/types/avro/AvroTypeTest.java | 52 ++++++ .../com/cloudera/crunch/types/avro/AvrosTest.java | 20 ++- .../writable/WritableGroupedTableTypeTest.java | 37 ++++ .../types/writable/WritableTableTypeTest.java | 30 ++++ .../crunch/types/writable/WritableTypeTest.java | 29 +++ .../crunch/types/writable/WritablesTest.java | 13 ++- 23 files changed, 744 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java index d41a52e..68ef054 100644 --- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java +++ b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends RuntimeException { super(e); } + public CrunchRuntimeException(String msg, Exception e) { + super(msg, e); + } + public boolean wasLogged() { return logged; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/Aggregate.java b/src/main/java/com/cloudera/crunch/lib/Aggregate.java index 1d8ebc5..4a2ff25 100644 --- a/src/main/java/com/cloudera/crunch/lib/Aggregate.java +++ b/src/main/java/com/cloudera/crunch/lib/Aggregate.java @@ -224,9 +224,14 @@ public class Aggregate { public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) { PTypeFamily tf = collect.getTypeFamily(); + final PType<V> valueType = collect.getValueType(); return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, Iterable<V>, Collection<V>>() { - public Collection<V> map(Iterable<V> v) { - return Lists.newArrayList(v); + public Collection<V> map(Iterable<V> values) { + List<V> collected = Lists.newArrayList(); + for (V value : values) { + collected.add(valueType.getDetachedValue(value)); + } + return collected; } }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType()))); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/lib/PTables.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/PTables.java b/src/main/java/com/cloudera/crunch/lib/PTables.java index 0c0f246..a68e5be 100644 --- a/src/main/java/com/cloudera/crunch/lib/PTables.java +++ b/src/main/java/com/cloudera/crunch/lib/PTables.java @@ -14,15 +14,23 @@ */ package com.cloudera.crunch.lib; +import java.util.List; + +import org.apache.hadoop.thirdparty.guava.common.collect.Lists; + import com.cloudera.crunch.DoFn; import com.cloudera.crunch.Emitter; import com.cloudera.crunch.PCollection; +import com.cloudera.crunch.PGroupedTable; import com.cloudera.crunch.PTable; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PGroupedTableType; +import com.cloudera.crunch.types.PTableType; +import com.cloudera.crunch.types.PType; /** * Methods for performing common operations on PTables. - * + * */ public class PTables { @@ -31,16 +39,55 @@ public class PTables { @Override public void process(Pair<K, V> input, Emitter<K> emitter) { emitter.emit(input.first()); - } + } }, ptable.getKeyType()); } - + public static <K, V> PCollection<V> values(PTable<K, V> ptable) { return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() { @Override public void process(Pair<K, V> input, Emitter<V> emitter) { emitter.emit(input.second()); - } + } }, ptable.getValueType()); } + + /** + * Create a detached value for a table {@link Pair}. + * + * @param tableType + * The table type + * @param value + * The value from which a detached value is to be created + * @return The detached value + * @see PType#getDetachedValue(Object) + */ + public static <K, V> Pair<K, V> getDetachedValue(PTableType<K, V> tableType, Pair<K, V> value) { + return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), tableType.getValueType() + .getDetachedValue(value.second())); + } + + /** + * Created a detached value for a {@link PGroupedTable} value. + * + * + * @param groupedTableType + * The grouped table type + * @param value + * The value from which a detached value is to be created + * @return The detached value + * @see PType#getDetachedValue(Object) + */ + public static <K, V> Pair<K, Iterable<V>> getGroupedDetachedValue( + PGroupedTableType<K, V> groupedTableType, Pair<K, Iterable<V>> value) { + + PTableType<K, V> tableType = groupedTableType.getTableType(); + List<V> detachedIterable = Lists.newArrayList(); + PType<V> valueType = tableType.getValueType(); + for (V v : value.second()) { + detachedIterable.add(valueType.getDetachedValue(v)); + } + return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), + (Iterable<V>) detachedIterable); + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/PType.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/PType.java b/src/main/java/com/cloudera/crunch/types/PType.java index af4ef1b..d6730a3 100644 --- a/src/main/java/com/cloudera/crunch/types/PType.java +++ b/src/main/java/com/cloudera/crunch/types/PType.java @@ -15,10 +15,12 @@ package com.cloudera.crunch.types; +import java.io.Serializable; import java.util.List; import org.apache.hadoop.fs.Path; +import com.cloudera.crunch.DoFn; import com.cloudera.crunch.MapFn; import com.cloudera.crunch.PCollection; import com.cloudera.crunch.SourceTarget; @@ -29,34 +31,50 @@ import com.cloudera.crunch.SourceTarget; * read/write data from/to HDFS. Every {@link PCollection} has an associated * {@code PType} that tells Crunch how to read/write data from that * {@code PCollection}. - * + * */ -public interface PType<T> { +public interface PType<T> extends Serializable { /** * Returns the Java type represented by this {@code PType}. */ Class<T> getTypeClass(); - + /** * Returns the {@code PTypeFamily} that this {@code PType} belongs to. */ PTypeFamily getFamily(); MapFn<Object, T> getInputMapFn(); - + MapFn<T, Object> getOutputMapFn(); - + Converter getConverter(); - + + /** + * Returns a copy of a value (or the value itself) that can safely be + * retained. + * <p> + * This is useful when iterable values being processed in a DoFn (via a + * reducer) need to be held on to for more than the scope of a single + * iteration, as a reducer (and therefore also a DoFn that has an Iterable as + * input) re-use deserialized values. More information on object reuse is + * available in the {@link DoFn} class documentation. + * + * @param value + * The value to be deep-copied + * @return A deep copy of the input value + */ + T getDetachedValue(T value); + /** * Returns a {@code SourceTarget} that is able to read/write data using the * serialization format specified by this {@code PType}. */ SourceTarget<T> getDefaultFileSource(Path path); - + /** - * Returns the sub-types that make up this PType if it is a composite instance, - * such as a tuple. + * Returns the sub-types that make up this PType if it is a composite + * instance, such as a tuple. */ List<PType> getSubTypes(); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java b/src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java new file mode 100644 index 0000000..86f1edb --- /dev/null +++ b/src/main/java/com/cloudera/crunch/types/avro/AvroDeepCopier.java @@ -0,0 +1,134 @@ +package com.cloudera.crunch.types.avro; + +import java.io.ByteArrayOutputStream; +import java.io.Serializable; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; + +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; + +/** + * Performs deep copies of Avro-serializable objects. + * <p> + * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be + * a problem when running in a map-reduce context where each mapper/reducer is + * running in its own JVM, but it may well be a problem in any other kind of + * multi-threaded context. + */ +public abstract class AvroDeepCopier<T> implements Serializable { + + private BinaryEncoder binaryEncoder; + private BinaryDecoder binaryDecoder; + protected DatumWriter<T> datumWriter; + protected DatumReader<T> datumReader; + + protected AvroDeepCopier(DatumWriter<T> datumWriter, DatumReader<T> datumReader) { + this.datumWriter = datumWriter; + this.datumReader = datumReader; + } + + protected abstract T createCopyTarget(); + + /** + * Deep copier for Avro specific data objects. + */ + public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> { + + private Class<T> valueClass; + + public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) { + super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader(schema)); + this.valueClass = valueClass; + } + + @Override + protected T createCopyTarget() { + return createNewInstance(valueClass); + } + + } + + /** + * Deep copier for Avro generic data objects. + */ + public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> { + + private Schema schema; + + public AvroGenericDeepCopier(Schema schema) { + super(new GenericDatumWriter<Record>(schema), new GenericDatumReader<Record>(schema)); + this.schema = schema; + } + + @Override + protected Record createCopyTarget() { + return new GenericData.Record(schema); + } + } + + /** + * Deep copier for Avro reflect data objects. + */ + public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> { + private Class<T> valueClass; + + public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) { + super(new ReflectDatumWriter<T>(schema), new ReflectDatumReader<T>(schema)); + this.valueClass = valueClass; + } + + @Override + protected T createCopyTarget() { + return createNewInstance(valueClass); + } + } + + /** + * Create a deep copy of an Avro value. + * + * @param source + * The value to be copied + * @return The deep copy of the value + */ + public T deepCopy(T source) { + ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); + binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder); + T target = createCopyTarget(); + try { + datumWriter.write(source, binaryEncoder); + binaryEncoder.flush(); + binaryDecoder = DecoderFactory.get() + .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); + datumReader.read(target, binaryDecoder); + } catch (Exception e) { + throw new CrunchRuntimeException("Error while deep copying avro value " + source, e); + } + + return target; + } + + protected T createNewInstance(Class<T> targetClass) { + try { + return targetClass.newInstance(); + } catch (InstantiationException e) { + throw new CrunchRuntimeException(e); + } catch (IllegalAccessException e) { + throw new CrunchRuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java index 091948e..0d53eaf 100644 --- a/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java +++ b/src/main/java/com/cloudera/crunch/types/avro/AvroGroupedTableType.java @@ -27,6 +27,7 @@ import com.cloudera.crunch.GroupingOptions; import com.cloudera.crunch.MapFn; import com.cloudera.crunch.Pair; import com.cloudera.crunch.fn.PairMapFn; +import com.cloudera.crunch.lib.PTables; import com.cloudera.crunch.types.Converter; import com.cloudera.crunch.types.PGroupedTableType; @@ -71,6 +72,11 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> { } @Override + public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) { + return PTables.getGroupedDetachedValue(this, value); + } + + @Override public void configureShuffle(Job job, GroupingOptions options) { AvroTableType<K, V> att = (AvroTableType<K, V>) tableType; String schemaJson = att.getSchema().toString(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java index 8d71b7f..b31ecec 100644 --- a/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java +++ b/src/main/java/com/cloudera/crunch/types/avro/AvroTableType.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import com.cloudera.crunch.MapFn; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.lib.PTables; import com.cloudera.crunch.types.PGroupedTableType; import com.cloudera.crunch.types.PTableType; import com.cloudera.crunch.types.PType; @@ -155,4 +156,9 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements public PGroupedTableType<K, V> getGroupedTableType() { return new AvroGroupedTableType<K, V>(this); } + + @Override + public Pair<K, V> getDetachedValue(Pair<K, V> value) { + return PTables.getDetachedValue(this, value); + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java index 3db00c0..0489fb6 100644 --- a/src/main/java/com/cloudera/crunch/types/avro/AvroType.java +++ b/src/main/java/com/cloudera/crunch/types/avro/AvroType.java @@ -41,10 +41,12 @@ public class AvroType<T> implements PType<T> { private static final Converter AVRO_CONVERTER = new AvroKeyConverter(); private final Class<T> typeClass; - private final Schema schema; + private final String schemaString; + private transient Schema schema; private final MapFn baseInputMapFn; private final MapFn baseOutputMapFn; private final List<PType> subTypes; + private AvroDeepCopier<T> deepCopier; public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) { this(typeClass, schema, IdentityFn.getInstance(), IdentityFn @@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> { MapFn outputMapFn, PType... ptypes) { this.typeClass = typeClass; this.schema = Preconditions.checkNotNull(schema); + this.schemaString = schema.toString(); this.baseInputMapFn = inputMapFn; this.baseOutputMapFn = outputMapFn; this.subTypes = ImmutableList.<PType> builder().add(ptypes).build(); @@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> { } public Schema getSchema() { + if (schema == null) { + schema = new Schema.Parser().parse(schemaString); + } return schema; } @@ -123,6 +129,26 @@ public class AvroType<T> implements PType<T> { return new AvroFileSourceTarget<T>(path, this); } + private AvroDeepCopier<T> getDeepCopier() { + if (deepCopier == null) { + if (isSpecific()) { + deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema()); + } else if (isGeneric()) { + deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema()); + } else { + deepCopier = new AvroDeepCopier.AvroReflectDeepCopier<T>(typeClass, getSchema()); + } + } + return deepCopier; + } + + public T getDetachedValue(T value) { + if (this.baseInputMapFn instanceof IdentityFn && !Avros.isPrimitive(this)) { + return getDeepCopier().deepCopy(value); + } + return value; + } + @Override public boolean equals(Object other) { if (other == null || !(other instanceof AvroType)) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/avro/Avros.java b/src/main/java/com/cloudera/crunch/types/avro/Avros.java index 59237fc..7b85145 100644 --- a/src/main/java/com/cloudera/crunch/types/avro/Avros.java +++ b/src/main/java/com/cloudera/crunch/types/avro/Avros.java @@ -142,6 +142,10 @@ public class Avros { return (PType<T>) PRIMITIVES.get(clazz); } + static <T> boolean isPrimitive(AvroType<T> avroType) { + return PRIMITIVES.containsKey(avroType.getTypeClass()); + } + private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) { return new AvroType<T>(clazz, Schema.create(schemaType)); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java b/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java index 87f222d..a93b7c2 100644 --- a/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java +++ b/src/main/java/com/cloudera/crunch/types/writable/WritableGroupedTableType.java @@ -19,6 +19,7 @@ import org.apache.hadoop.mapreduce.Job; import com.cloudera.crunch.GroupingOptions; import com.cloudera.crunch.MapFn; import com.cloudera.crunch.Pair; +import com.cloudera.crunch.lib.PTables; import com.cloudera.crunch.types.Converter; import com.cloudera.crunch.types.PGroupedTableType; @@ -60,6 +61,11 @@ public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> { } @Override + public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) { + return PTables.getGroupedDetachedValue(this, value); + } + + @Override public void configureShuffle(Job job, GroupingOptions options) { if (options != null) { options.configure(job); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java b/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java index 376af48..0400e0c 100644 --- a/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java +++ b/src/main/java/com/cloudera/crunch/types/writable/WritableTableType.java @@ -25,6 +25,7 @@ import com.cloudera.crunch.Pair; import com.cloudera.crunch.SourceTarget; import com.cloudera.crunch.fn.PairMapFn; import com.cloudera.crunch.io.seq.SeqFileTableSourceTarget; +import com.cloudera.crunch.lib.PTables; import com.cloudera.crunch.types.Converter; import com.cloudera.crunch.types.PGroupedTableType; import com.cloudera.crunch.types.PTableType; @@ -101,6 +102,11 @@ class WritableTableType<K, V> implements PTableType<K, V> { } @Override + public Pair<K, V> getDetachedValue(Pair<K, V> value) { + return PTables.getDetachedValue(this, value); + } + + @Override public boolean equals(Object obj) { if (obj == null || !(obj instanceof WritableTableType)) { return false; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/writable/WritableType.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/writable/WritableType.java b/src/main/java/com/cloudera/crunch/types/writable/WritableType.java index 8031e90..ff9b441 100644 --- a/src/main/java/com/cloudera/crunch/types/writable/WritableType.java +++ b/src/main/java/com/cloudera/crunch/types/writable/WritableType.java @@ -18,16 +18,18 @@ import java.util.List; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; import com.cloudera.crunch.MapFn; import com.cloudera.crunch.SourceTarget; +import com.cloudera.crunch.fn.IdentityFn; import com.cloudera.crunch.io.seq.SeqFileSourceTarget; import com.cloudera.crunch.types.Converter; import com.cloudera.crunch.types.PType; import com.cloudera.crunch.types.PTypeFamily; import com.google.common.collect.ImmutableList; -public class WritableType<T, W> implements PType<T> { +public class WritableType<T, W extends Writable> implements PType<T> { private final Class<T> typeClass; private final Class<W> writableClass; @@ -95,6 +97,19 @@ public class WritableType<T, W> implements PType<T> { subTypes.equals(wt.subTypes)); } + // Unchecked warnings are suppressed because we know that W and T are the same + // type (due to the IdentityFn being used) + @SuppressWarnings("unchecked") + @Override + public T getDetachedValue(T value) { + if (this.inputFn.getClass().equals(IdentityFn.class)) { + W writableValue = (W) value; + return (T) Writables.deepCopy(writableValue, this.writableClass); + } else { + return value; + } + } + @Override public int hashCode() { HashCodeBuilder hcb = new HashCodeBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/main/java/com/cloudera/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/types/writable/Writables.java b/src/main/java/com/cloudera/crunch/types/writable/Writables.java index bb07833..2262946 100644 --- a/src/main/java/com/cloudera/crunch/types/writable/Writables.java +++ b/src/main/java/com/cloudera/crunch/types/writable/Writables.java @@ -14,6 +14,11 @@ */ package com.cloudera.crunch.types.writable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; @@ -39,6 +44,7 @@ import com.cloudera.crunch.Tuple4; import com.cloudera.crunch.TupleN; import com.cloudera.crunch.fn.CompositeMapFn; import com.cloudera.crunch.fn.IdentityFn; +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; import com.cloudera.crunch.types.PType; import com.cloudera.crunch.types.TupleFactory; import com.cloudera.crunch.util.PTypes; @@ -207,7 +213,7 @@ public class Writables { return (PType<T>) PRIMITIVES.get(clazz); } - public static <T> void register(Class<T> clazz, WritableType<T, ?> ptype) { + public static <T> void register(Class<T> clazz, WritableType<T, ? extends Writable> ptype) { EXTENSIONS.put(clazz, ptype); } @@ -243,11 +249,11 @@ public class Writables { return bytes; } - public static final <T> WritableType<T, T> records(Class<T> clazz) { + public static final <T, W extends Writable> WritableType<T, W> records(Class<T> clazz) { if (EXTENSIONS.containsKey(clazz)) { - return (WritableType<T, T>) EXTENSIONS.get(clazz); + return (WritableType<T, W>) EXTENSIONS.get(clazz); } - return (WritableType<T, T>) writables(clazz.asSubclass(Writable.class)); + return (WritableType<T, W>) writables(clazz.asSubclass(Writable.class)); } public static <W extends Writable> WritableType<W, W> writables(Class<W> clazz) { @@ -593,6 +599,32 @@ public class Writables { return PTypes.jsonString(clazz, WritableTypeFamily.getInstance()); } + /** + * Perform a deep copy of a writable value. + * + * @param value + * The value to be copied + * @param writableClass + * The Writable class of the value to be copied + * @return A fully detached deep copy of the input value + */ + public static <T extends Writable> T deepCopy(T value, Class<T> writableClass) { + ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(byteOutStream); + T copiedValue = null; + try { + value.write(dataOut); + dataOut.flush(); + ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); + DataInput dataInput = new DataInputStream(byteInStream); + copiedValue = writableClass.newInstance(); + copiedValue.readFields(dataInput); + } catch (Exception e) { + throw new CrunchRuntimeException("Error while deep copying " + value, e); + } + return copiedValue; + } + // Not instantiable private Writables() { } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java b/src/test/java/com/cloudera/crunch/lib/AggregateTest.java index 18b9a68..dc8bf9a 100644 --- a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java +++ b/src/test/java/com/cloudera/crunch/lib/AggregateTest.java @@ -19,8 +19,11 @@ import static com.cloudera.crunch.types.writable.Writables.tableOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.Collection; +import java.util.Map; +import org.apache.hadoop.io.Text; import org.junit.Test; import com.cloudera.crunch.MapFn; @@ -30,14 +33,17 @@ import com.cloudera.crunch.Pair; import com.cloudera.crunch.Pipeline; import com.cloudera.crunch.impl.mem.MemPipeline; import com.cloudera.crunch.impl.mr.MRPipeline; +import com.cloudera.crunch.test.Employee; import com.cloudera.crunch.test.FileHelper; import com.cloudera.crunch.types.PTableType; import com.cloudera.crunch.types.PTypeFamily; import com.cloudera.crunch.types.avro.AvroTypeFamily; import com.cloudera.crunch.types.avro.Avros; import com.cloudera.crunch.types.writable.WritableTypeFamily; +import com.cloudera.crunch.types.writable.Writables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; public class AggregateTest { @@ -122,4 +128,102 @@ public class AggregateTest { PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false); assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize()); } + + @Test + public void testCollectValues_Writables() throws IOException { + Pipeline pipeline = new MRPipeline(AggregateTest.class); + Map<Integer, Collection<Text>> collectionMap = pipeline + .readTextFile(FileHelper.createTempCopyOf("set2.txt")) + .parallelDo(new MapStringToTextPair(), + Writables.tableOf(Writables.ints(), Writables.writables(Text.class)) + ).collectValues().materializeToMap(); + + assertEquals(1, collectionMap.size()); + + assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")), + collectionMap.get(1)); + } + + @Test + public void testCollectValues_Avro() throws IOException { + + MapStringToEmployeePair mapFn = new MapStringToEmployeePair(); + Pipeline pipeline = new MRPipeline(AggregateTest.class); + Map<Integer, Collection<Employee>> collectionMap = pipeline + .readTextFile(FileHelper.createTempCopyOf("set2.txt")) + .parallelDo(mapFn, + Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues() + .materializeToMap(); + + assertEquals(1, collectionMap.size()); + + Employee empC = mapFn.map("c").second(); + Employee empD = mapFn.map("d").second(); + Employee empA = mapFn.map("a").second(); + + assertEquals(Lists.newArrayList(empC, empD, empA), + collectionMap.get(1)); + } + + private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> { + @Override + public Pair<Integer, Text> map(String input) { + return Pair.of(1, new Text(input)); + } + } + + private static class MapStringToEmployeePair extends MapFn<String, Pair<Integer, Employee>> { + @Override + public Pair<Integer, Employee> map(String input) { + Employee emp = new Employee(); + emp.setName(input); + emp.setSalary(0); + emp.setDepartment(""); + return Pair.of(1, emp); + } + } + + public static class PojoText { + private String value; + + public PojoText() { + this(""); + } + + public PojoText(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + @Override + public String toString() { + return String.format("PojoText<%s>", this.value); + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + PojoText other = (PojoText) obj; + if (value == null) { + if (other.value != null) + return false; + } else if (!value.equals(other.value)) + return false; + return true; + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java new file mode 100644 index 0000000..cce37cd --- /dev/null +++ b/src/test/java/com/cloudera/crunch/types/avro/AvroDeepCopierTest.java @@ -0,0 +1,58 @@ +package com.cloudera.crunch.types.avro; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +import org.apache.avro.generic.GenericData.Record; +import org.junit.Test; + +import com.cloudera.crunch.test.Person; +import com.cloudera.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier; +import com.google.common.collect.Lists; + +public class AvroDeepCopierTest { + + @Test + public void testDeepCopySpecific() { + Person person = new Person(); + person.setName("John Doe"); + person.setAge(42); + person.setSiblingnames(Lists.<CharSequence> newArrayList()); + + Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$) + .deepCopy(person); + + assertEquals(person, deepCopyPerson); + assertNotSame(person, deepCopyPerson); + } + + @Test + public void testDeepCopyGeneric() { + Record record = new Record(Person.SCHEMA$); + record.put("name", "John Doe"); + record.put("age", 42); + record.put("siblingnames", Lists.newArrayList()); + + Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$) + .deepCopy(record); + + assertEquals(record, deepCopyRecord); + assertNotSame(record, deepCopyRecord); + } + + @Test + public void testDeepCopyReflect() { + Person person = new Person(); + person.setName("John Doe"); + person.setAge(42); + person.setSiblingnames(Lists.<CharSequence> newArrayList()); + + Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class, + Person.SCHEMA$).deepCopy(person); + + assertEquals(person, deepCopyPerson); + assertNotSame(person, deepCopyPerson); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java new file mode 100644 index 0000000..134dd9d --- /dev/null +++ b/src/test/java/com/cloudera/crunch/types/avro/AvroGroupedTableTypeTest.java @@ -0,0 +1,41 @@ +package com.cloudera.crunch.types.avro; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import java.util.List; + +import org.junit.Test; + +import com.cloudera.crunch.Pair; +import com.cloudera.crunch.test.Person; +import com.cloudera.crunch.types.PGroupedTableType; +import com.google.common.collect.Lists; + +public class AvroGroupedTableTypeTest { + + @Test + public void testGetDetachedValue() { + Integer integerValue = 42; + Person person = new Person(); + person.setName("John Doe"); + person.setAge(42); + person.setSiblingnames(Lists.<CharSequence> newArrayList()); + + Iterable<Person> inputPersonIterable = Lists.newArrayList(person); + Pair<Integer, Iterable<Person>> pair = Pair.of(integerValue, inputPersonIterable); + + PGroupedTableType<Integer, Person> groupedTableType = Avros.tableOf(Avros.ints(), + Avros.reflects(Person.class)).getGroupedTableType(); + + Pair<Integer, Iterable<Person>> detachedPair = groupedTableType.getDetachedValue(pair); + + assertSame(integerValue, detachedPair.first()); + List<Person> personList = Lists.newArrayList(detachedPair.second()); + assertEquals(inputPersonIterable, personList); + assertNotSame(person, personList.get(0)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java new file mode 100644 index 0000000..867ee6f --- /dev/null +++ b/src/test/java/com/cloudera/crunch/types/avro/AvroTableTypeTest.java @@ -0,0 +1,35 @@ +package com.cloudera.crunch.types.avro; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import org.junit.Test; + +import com.cloudera.crunch.Pair; +import com.cloudera.crunch.test.Person; +import com.google.common.collect.Lists; + +public class AvroTableTypeTest { + + @Test + public void testGetDetachedValue() { + Integer integerValue = 42; + Person person = new Person(); + person.setName("John Doe"); + person.setAge(42); + person.setSiblingnames(Lists.<CharSequence> newArrayList()); + + Pair<Integer, Person> pair = Pair.of(integerValue, person); + + AvroTableType<Integer, Person> tableType = Avros.tableOf(Avros.ints(), + Avros.reflects(Person.class)); + + Pair<Integer, Person> detachedPair = tableType.getDetachedValue(pair); + + assertSame(integerValue, detachedPair.first()); + assertEquals(person, detachedPair.second()); + assertNotSame(person, detachedPair.second()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java index d2c2ab9..49760f4 100644 --- a/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java +++ b/src/test/java/com/cloudera/crunch/types/avro/AvroTypeTest.java @@ -1,8 +1,14 @@ package com.cloudera.crunch.types.avro; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.hadoop.thirdparty.guava.common.collect.Lists; import org.junit.Test; import com.cloudera.crunch.test.Person; @@ -63,4 +69,50 @@ public class AvroTypeTest { Avros.generics(Person.SCHEMA$)).isGeneric()); } + @Test + public void testGetDetachedValue_AlreadyMappedAvroType() { + Integer value = 42; + Integer detachedValue = Avros.ints().getDetachedValue(value); + assertSame(value, detachedValue); + } + + @Test + public void testGetDetachedValue_GenericAvroType() { + AvroType<Record> genericType = Avros.generics(Person.SCHEMA$); + GenericData.Record record = new GenericData.Record(Person.SCHEMA$); + record.put("name", "name value"); + record.put("age", 42); + record.put("siblingnames", Lists.newArrayList()); + + Record detachedRecord = genericType.getDetachedValue(record); + assertEquals(record, detachedRecord); + assertNotSame(record, detachedRecord); + } + + @Test + public void testGetDetachedValue_SpecificAvroType() { + AvroType<Person> specificType = Avros.records(Person.class); + Person person = new Person(); + person.setName("name value"); + person.setAge(42); + person.setSiblingnames(Lists.<CharSequence> newArrayList()); + + Person detachedPerson = specificType.getDetachedValue(person); + assertEquals(person, detachedPerson); + assertNotSame(person, detachedPerson); + } + + @Test + public void testGetDetachedValue_ReflectAvroType() { + AvroType<Person> reflectType = Avros.reflects(Person.class); + Person person = new Person(); + person.setName("name value"); + person.setAge(42); + person.setSiblingnames(Lists.<CharSequence> newArrayList()); + + Person detachedPerson = reflectType.getDetachedValue(person); + assertEquals(person, detachedPerson); + assertNotSame(person, detachedPerson); + } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java b/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java index c7e9908..8bdd084 100644 --- a/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java +++ b/src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java @@ -15,7 +15,9 @@ package com.cloudera.crunch.types.avro; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; import java.util.Collection; @@ -31,6 +33,7 @@ import com.cloudera.crunch.Pair; import com.cloudera.crunch.Tuple3; import com.cloudera.crunch.Tuple4; import com.cloudera.crunch.TupleN; +import com.cloudera.crunch.test.Person; import com.cloudera.crunch.types.PTableType; import com.cloudera.crunch.types.PType; import com.google.common.collect.ImmutableList; @@ -104,9 +107,9 @@ public class AvrosTest { @Test public void testNestedTables() throws Exception { - PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs()); - String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString(); - assertNotNull(schema); + PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs()); + String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString(); + assertNotNull(schema); } @Test @@ -204,4 +207,15 @@ public class AvrosTest { assertEquals(java, ptype.getInputMapFn().map(avro)); assertEquals(avro, ptype.getOutputMapFn().map(java)); } + + @Test + public void testIsPrimitive_True() { + assertTrue(Avros.isPrimitive(Avros.ints())); + } + + @Test + public void testIsPrimitive_False() { + assertFalse(Avros.isPrimitive(Avros.reflects(Person.class))); + } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java b/src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java new file mode 100644 index 0000000..2357393 --- /dev/null +++ b/src/test/java/com/cloudera/crunch/types/writable/WritableGroupedTableTypeTest.java @@ -0,0 +1,37 @@ +package com.cloudera.crunch.types.writable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import java.util.List; + +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.cloudera.crunch.Pair; +import com.cloudera.crunch.types.PGroupedTableType; +import com.google.common.collect.Lists; + +public class WritableGroupedTableTypeTest { + + @Test + public void testGetDetachedValue() { + Integer integerValue = 42; + Text textValue = new Text("forty-two"); + Iterable<Text> inputTextIterable = Lists.newArrayList(textValue); + Pair<Integer, Iterable<Text>> pair = Pair.of(integerValue, inputTextIterable); + + PGroupedTableType<Integer, Text> groupedTableType = Writables.tableOf(Writables.ints(), Writables.writables(Text.class)) + .getGroupedTableType(); + + Pair<Integer, Iterable<Text>> detachedPair = groupedTableType.getDetachedValue(pair); + + assertSame(integerValue, detachedPair.first()); + List<Text> textList = Lists.newArrayList(detachedPair.second()); + assertEquals(inputTextIterable, textList); + assertNotSame(textValue, textList.get(0)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java b/src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java new file mode 100644 index 0000000..96015f6 --- /dev/null +++ b/src/test/java/com/cloudera/crunch/types/writable/WritableTableTypeTest.java @@ -0,0 +1,30 @@ +package com.cloudera.crunch.types.writable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.cloudera.crunch.Pair; + +public class WritableTableTypeTest { + + @Test + public void testGetDetachedValue() { + Integer integerValue = 42; + Text textValue = new Text("forty-two"); + Pair<Integer, Text> pair = Pair.of(integerValue, textValue); + + WritableTableType<Integer, Text> tableType = Writables.tableOf(Writables.ints(), + Writables.writables(Text.class)); + + Pair<Integer, Text> detachedPair = tableType.getDetachedValue(pair); + + assertSame(integerValue, detachedPair.first()); + assertEquals(textValue, detachedPair.second()); + assertNotSame(textValue, detachedPair.second()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java b/src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java new file mode 100644 index 0000000..8dff574 --- /dev/null +++ b/src/test/java/com/cloudera/crunch/types/writable/WritableTypeTest.java @@ -0,0 +1,29 @@ +package com.cloudera.crunch.types.writable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class WritableTypeTest { + + @Test + public void testGetDetachedValue_AlreadyMappedWritable() { + WritableType<String, Text> stringType = Writables.strings(); + String value = "test"; + assertSame(value, stringType.getDetachedValue(value)); + } + + @Test + public void testGetDetachedValue_CustomWritable() { + WritableType<Text, Text> textWritableType = Writables.writables(Text.class); + Text value = new Text("test"); + + Text detachedValue = textWritableType.getDetachedValue(value); + assertEquals(value, detachedValue); + assertNotSame(value, detachedValue); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7397d98a/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java b/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java index ac0b3db..2be2c0b 100644 --- a/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java +++ b/src/test/java/com/cloudera/crunch/types/writable/WritablesTest.java @@ -16,7 +16,8 @@ package com.cloudera.crunch.types.writable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import java.io.DataInput; import java.io.DataOutput; @@ -254,7 +255,7 @@ public class WritablesTest { public void testRegister() throws Exception { WritableType<TestWritable, TestWritable> wt = Writables.writables(TestWritable.class); Writables.register(TestWritable.class, wt); - assertTrue(Writables.records(TestWritable.class) == wt); + assertSame(Writables.records(TestWritable.class), wt); } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -264,4 +265,12 @@ public class WritablesTest { assertEquals(java, ptype.getInputMapFn().map(writable)); assertEquals(writable, ptype.getOutputMapFn().map(java)); } + + @Test + public void testDeepCopy() { + Text text = new Text("Test"); + Text copiedText = Writables.deepCopy(text, Text.class); + assertEquals(text, copiedText); + assertNotSame(text, copiedText); + } }
