Updated Branches: refs/heads/master e72f9fde7 -> e4e8f9948
CRUNCH-49 - Fix avro materialize with hybrid PType Correct materializing of a PCollection that includes both Reflect and Specific data (if it is supported by the underlying Avro version). Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/e4e8f994 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/e4e8f994 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/e4e8f994 Branch: refs/heads/master Commit: e4e8f994866922b5bc3ea92b52a80a502f083bd1 Parents: e72f9fd Author: Gabriel Reid <[email protected]> Authored: Wed Aug 22 20:31:36 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Wed Aug 22 20:31:36 2012 +0200 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/MaterializeIT.java | 41 +++++- .../crunch/io/avro/AvroFileSourceTargetIT.java | 16 +- .../org/apache/crunch/io/avro/AvroReflectIT.java | 105 ++++----------- .../crunch/io/avro/AvroFileReaderFactory.java | 16 ++- .../crunch/io/avro/AvroFileReaderFactoryTest.java | 74 ++++++++--- 5 files changed, 135 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/e4e8f994/crunch/src/it/java/org/apache/crunch/MaterializeIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java index f309462..3b4f0e6 100644 --- a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java +++ b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java @@ -25,11 +25,15 @@ import java.util.List; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; @@ -74,13 +78,15 @@ public class MaterializeIT { @Test public void testMaterializeEmptyIntermediate_Writables() throws IOException { - runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()), + runMaterializeEmptyIntermediate( + new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance()); } @Test public void testMaterializeEmptyIntermediate_Avro() throws IOException { - runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()), + runMaterializeEmptyIntermediate( + new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance()); } @@ -103,11 +109,40 @@ public class MaterializeIT { pipeline.done(); } - public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { + public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily) + throws IOException { String inputPath = tmpDir.copyResourceFileName("set1.txt"); PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FalseFilterFn()); assertTrue(Lists.newArrayList(empty.materialize()).isEmpty()); pipeline.done(); } + + static class StringToStringWrapperPersonPairMapFn extends MapFn<String, Pair<StringWrapper, Person>> { + + @Override + public Pair<StringWrapper, Person> map(String input) { + Person person = new Person(); + person.name = input; + person.age = 42; + person.siblingnames = Lists.<CharSequence> newArrayList(); + return Pair.of(new StringWrapper(input), person); + } + + } + + @Test + public void testMaterializeAvroPersonAndReflectsPair_GroupedTable() throws IOException { + Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS); + Pipeline pipeline = new MRPipeline(MaterializeIT.class); + List<Pair<StringWrapper, Person>> pairList = Lists.newArrayList(pipeline + .readTextFile(tmpDir.copyResourceFileName("set1.txt")) + .parallelDo(new StringToStringWrapperPersonPairMapFn(), + Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class))) + .materialize()); + + // We just need to make sure this doesn't crash + assertEquals(4, pairList.size()); + + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/e4e8f994/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java index c310d94..7334e91 100644 --- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java +++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java @@ -36,8 +36,8 @@ import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; -import org.apache.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson; import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.avro.Avros; @@ -128,19 +128,19 @@ public class AvroFileSourceTargetIT implements Serializable { @Test public void testReflect() throws IOException { - Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class); + Schema pojoPersonSchema = ReflectData.get().getSchema(StringWrapper.class); GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema); - savedRecord.put("name", "John Doe"); + savedRecord.put("value", "stringvalue"); populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema); Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); - PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), - Avros.reflects(PojoPerson.class))); + PCollection<StringWrapper> stringValueCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.reflects(StringWrapper.class))); - List<PojoPerson> recordList = Lists.newArrayList(personCollection.materialize()); + List<StringWrapper> recordList = Lists.newArrayList(stringValueCollection.materialize()); assertEquals(1, recordList.size()); - PojoPerson person = recordList.get(0); - assertEquals("John Doe", person.getName()); + StringWrapper stringWrapper = recordList.get(0); + assertEquals("stringvalue", stringWrapper.getValue()); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/e4e8f994/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java index 93e15c0..7a90517 100644 --- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java +++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java @@ -31,6 +31,7 @@ import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.avro.Avros; @@ -42,80 +43,29 @@ import com.google.common.collect.Lists; public class AvroReflectIT implements Serializable { - static class StringWrapper { - private String value; - - public StringWrapper() { - this(null); - } - - public StringWrapper(String value) { - this.value = value; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } - - @Override - public String toString() { - return String.format("<StringWrapper(%s)>", value); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((value == null) ? 0 : value.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - StringWrapper other = (StringWrapper) obj; - if (value == null) { - if (other.value != null) - return false; - } else if (!value.equals(other.value)) - return false; - return true; - } - - } - @Rule public transient TemporaryPath tmpDir = TemporaryPaths.create(); @Test public void testReflection() throws IOException { Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration()); - PCollection<StringWrapper> stringWrapperCollection = pipeline.readTextFile( - tmpDir.copyResourceFileName("set1.txt")).parallelDo(new MapFn<String, StringWrapper>() { + PCollection<StringWrapper> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")) + .parallelDo(new MapFn<String, StringWrapper>() { - @Override - public StringWrapper map(String input) { - StringWrapper stringWrapper = new StringWrapper(); - stringWrapper.setValue(input); - return stringWrapper; - } - }, Avros.reflects(StringWrapper.class)); + @Override + public StringWrapper map(String input) { + StringWrapper stringWrapper = new StringWrapper(); + stringWrapper.setValue(input); + return stringWrapper; + } + }, Avros.reflects(StringWrapper.class)); List<StringWrapper> stringWrappers = Lists.newArrayList(stringWrapperCollection.materialize()); pipeline.done(); - assertEquals(Lists.newArrayList(new StringWrapper("b"), new StringWrapper("c"), - new StringWrapper("a"), new StringWrapper("e")), stringWrappers); + assertEquals(Lists.newArrayList(new StringWrapper("b"), new StringWrapper("c"), new StringWrapper("a"), + new StringWrapper("e")), stringWrappers); } @@ -126,22 +76,20 @@ public class AvroReflectIT implements Serializable { Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS); Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration()); PCollection<Pair<StringWrapper, Person>> hybridPairCollection = pipeline.readTextFile( - tmpDir.copyResourceFileName("set1.txt")).parallelDo( - new MapFn<String, Pair<StringWrapper, Person>>() { + tmpDir.copyResourceFileName("set1.txt")).parallelDo(new MapFn<String, Pair<StringWrapper, Person>>() { - @Override - public Pair<StringWrapper, Person> map(String input) { - Person person = new Person(); - person.name = input; - person.age = 42; - person.siblingnames = Lists.<CharSequence> newArrayList(input); + @Override + public Pair<StringWrapper, Person> map(String input) { + Person person = new Person(); + person.name = input; + person.age = 42; + person.siblingnames = Lists.<CharSequence> newArrayList(input); - return Pair.of(new StringWrapper(input), person); - } - }, Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class))); + return Pair.of(new StringWrapper(input), person); + } + }, Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class))); - PCollection<Pair<String, Long>> countCollection = Aggregate.count(hybridPairCollection) - .parallelDo( + PCollection<Pair<String, Long>> countCollection = Aggregate.count(hybridPairCollection).parallelDo( new MapFn<Pair<Pair<StringWrapper, Person>, Long>, Pair<String, Long>>() { @Override @@ -150,10 +98,9 @@ public class AvroReflectIT implements Serializable { } }, Avros.pairs(Avros.strings(), Avros.longs())); - List<Pair<String, Long>> materialized = Lists - .newArrayList(countCollection.materialize()); - List<Pair<String, Long>> expected = Lists.newArrayList(Pair.of("a", 1L), Pair.of("b", 1L), - Pair.of("c", 1L), Pair.of("e", 1L)); + List<Pair<String, Long>> materialized = Lists.newArrayList(countCollection.materialize()); + List<Pair<String, Long>> expected = Lists.newArrayList(Pair.of("a", 1L), Pair.of("b", 1L), Pair.of("c", 1L), + Pair.of("e", 1L)); Collections.sort(materialized); assertEquals(expected, materialized); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/e4e8f994/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java index 3345bd6..982f6db 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.crunch.MapFn; import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -47,18 +48,21 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> { private final Configuration conf; public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) { - this.recordReader = createDatumReader(atype); + this.recordReader = AvroFileReaderFactory.createDatumReader(atype); this.mapFn = (MapFn<T, T>) atype.getInputMapFn(); this.conf = conf; } - private DatumReader<T> createDatumReader(AvroType<T> avroType) { - if (avroType.hasSpecific()) { + static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) { + if (avroType.hasReflect()) { + if (avroType.hasSpecific()) { + Avros.checkCombiningSpecificAndReflectionSchemas(); + } + return new ReflectDatumReader<T>(avroType.getSchema()); + } else if (avroType.hasSpecific()) { return new SpecificDatumReader<T>(avroType.getSchema()); - } else if (avroType.isGeneric()) { - return new GenericDatumReader<T>(avroType.getSchema()); } else { - return new ReflectDatumReader<T>(avroType.getSchema()); + return new GenericDatumReader<T>(avroType.getSchema()); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/e4e8f994/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java index 4c6adaa..66863ba 100644 --- a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java +++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java @@ -29,15 +29,24 @@ import java.util.List; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.crunch.Pair; import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; +import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -49,8 +58,6 @@ public class AvroFileReaderFactoryTest { @Before public void setUp() throws IOException { - // InputSupplier<InputStream> inputStreamSupplier = - // newInputStreamSupplier(getResource("person.avro")); avroFile = File.createTempFile("test", ".av"); } @@ -75,6 +82,10 @@ public class AvroFileReaderFactoryTest { } + private <T> AvroFileReaderFactory<T> createFileReaderFactory(AvroType<T> avroType) { + return new AvroFileReaderFactory<T>(avroType, new Configuration()); + } + @Test public void testRead_GenericReader() throws IOException { GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); @@ -83,8 +94,7 @@ public class AvroFileReaderFactoryTest { savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); - AvroFileReaderFactory<GenericData.Record> genericReader = new AvroFileReaderFactory<GenericData.Record>( - Avros.generics(Person.SCHEMA$), new Configuration()); + AvroFileReaderFactory<GenericData.Record> genericReader = createFileReaderFactory(Avros.generics(Person.SCHEMA$)); Iterator<GenericData.Record> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path(this.avroFile.getAbsolutePath())); @@ -101,8 +111,7 @@ public class AvroFileReaderFactoryTest { savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); - AvroFileReaderFactory<Person> genericReader = new AvroFileReaderFactory<Person>(Avros.records(Person.class), - new Configuration()); + AvroFileReaderFactory<Person> genericReader = createFileReaderFactory(Avros.records(Person.class)); Iterator<Person> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path( this.avroFile.getAbsolutePath())); @@ -122,31 +131,54 @@ public class AvroFileReaderFactoryTest { @Test public void testRead_ReflectReader() throws IOException { - Schema reflectSchema = ReflectData.get().getSchema(PojoPerson.class); + Schema reflectSchema = ReflectData.get().getSchema(StringWrapper.class); GenericRecord savedRecord = new GenericData.Record(reflectSchema); - savedRecord.put("name", "John Doe"); + savedRecord.put("value", "stringvalue"); populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema); - AvroFileReaderFactory<PojoPerson> genericReader = new AvroFileReaderFactory<PojoPerson>( - Avros.reflects(PojoPerson.class), new Configuration()); - Iterator<PojoPerson> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path( + AvroFileReaderFactory<StringWrapper> genericReader = createFileReaderFactory(Avros.reflects(StringWrapper.class)); + Iterator<StringWrapper> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path( this.avroFile.getAbsolutePath())); - PojoPerson person = recordIterator.next(); + StringWrapper stringWrapper = recordIterator.next(); - assertEquals("John Doe", person.getName()); + assertEquals("stringvalue", stringWrapper.getValue()); assertFalse(recordIterator.hasNext()); } - public static class PojoPerson { - private String name; + @Test + public void testCreateDatumReader_Generic() { + DatumReader<Record> datumReader = AvroFileReaderFactory.createDatumReader(Avros.generics(Person.SCHEMA$)); + assertEquals(GenericDatumReader.class, datumReader.getClass()); + } - public String getName() { - return name; - } + @Test + public void testCreateDatumReader_Reflect() { + DatumReader<StringWrapper> datumReader = AvroFileReaderFactory.createDatumReader(Avros + .reflects(StringWrapper.class)); + assertEquals(ReflectDatumReader.class, datumReader.getClass()); + } - public void setName(String name) { - this.name = name; - } + @Test + public void testCreateDatumReader_Specific() { + DatumReader<Person> datumReader = AvroFileReaderFactory.createDatumReader(Avros.records(Person.class)); + assertEquals(SpecificDatumReader.class, datumReader.getClass()); + } + + @Test + public void testCreateDatumReader_ReflectAndSpecific() { + Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS); + + DatumReader<Pair<Person, StringWrapper>> datumReader = AvroFileReaderFactory.createDatumReader(Avros.pairs( + Avros.records(Person.class), Avros.reflects(StringWrapper.class))); + assertEquals(ReflectDatumReader.class, datumReader.getClass()); + } + + @Test(expected = IllegalStateException.class) + public void testCreateDatumReader_ReflectAndSpecific_NotSupported() { + Assume.assumeTrue(!Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS); + AvroFileReaderFactory.createDatumReader(Avros.pairs(Avros.records(Person.class), + Avros.reflects(StringWrapper.class))); } + }
