Add a static initializer to indicate whether combined specific+reflect schemas are supported by the version of avro used
Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/89e58b3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/89e58b3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/89e58b3e Branch: refs/heads/master Commit: 89e58b3e254d1ebb7885cc57574981a61b16e091 Parents: b05f183 Author: jwills <[email protected]> Authored: Tue Aug 21 07:25:56 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Aug 21 18:54:29 2012 -0700 ---------------------------------------------------------------------- .../crunch/io/avro/AvroFileReaderFactory.java | 2 +- .../org/apache/crunch/io/avro/AvroFileSource.java | 2 +- .../crunch/types/avro/AvroGroupedTableType.java | 8 +++- .../org/apache/crunch/types/avro/AvroType.java | 26 ++++++------- .../java/org/apache/crunch/types/avro/Avros.java | 29 ++++++++++++++- .../crunch/types/avro/AvroTableTypeTest.java | 6 +- .../org/apache/crunch/types/avro/AvroTypeTest.java | 30 +++++++------- 7 files changed, 66 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java index 220b134..3345bd6 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java @@ -53,7 +53,7 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> { } private DatumReader<T> createDatumReader(AvroType<T> avroType) { - if (avroType.isSpecific()) { + if (avroType.hasSpecific()) { return new SpecificDatumReader<T>(avroType.getSchema()); } else if (avroType.isGeneric()) { return new GenericDatumReader<T>(avroType.getSchema()); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java index 0ce4c06..2226556 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java @@ -35,7 +35,7 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour public AvroFileSource(Path path, AvroType<T> ptype) { super(path, ptype, new InputBundle(AvroInputFormat.class) - .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.isReflect())) + .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect())) .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString()) .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName())); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java index f4e407a..e15581d 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java @@ -82,8 +82,12 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> { String schemaJson = att.getSchema().toString(); Configuration conf = job.getConfiguration(); - if (att.isReflect()) { - conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true); + if (att.hasReflect()) { + if (att.hasSpecific()) { + Avros.checkCombiningSpecificAndReflectionSchemas(); + } else { + conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true); + } } conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson); job.setSortComparatorClass(AvroKeyComparator.class); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java index 7aaec25..4997157 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -91,28 +91,26 @@ public class AvroType<T> implements PType<T> { } /** - * Determine if the wrapped type is a specific data avro type. + * Determine if the wrapped type is a specific data avro type or wraps one. * - * @return true if the wrapped type is a specific data type + * @return true if the wrapped type is a specific data type or wraps one */ - public boolean isSpecific() { + public boolean hasSpecific() { if (Avros.isPrimitive(this)) { return false; } - - boolean hasSpecific = false; + if (!this.subTypes.isEmpty()) { for (PType<?> subType : this.subTypes) { AvroType<?> atype = (AvroType<?>) subType; - if (atype.isReflect()) { - return false; - } else if (atype.isSpecific()) { - hasSpecific = true; + if (atype.hasSpecific()) { + return true; } } + return false; } - return hasSpecific || SpecificRecord.class.isAssignableFrom(typeClass); + return SpecificRecord.class.isAssignableFrom(typeClass); } /** @@ -125,18 +123,18 @@ public class AvroType<T> implements PType<T> { } /** - * Determine if the wrapped type is a reflection-based avro type. + * Determine if the wrapped type is a reflection-based avro type or wraps one. * - * @return true if the wrapped type is a reflection-based type + * @return true if the wrapped type is a reflection-based type or wraps one. */ - public boolean isReflect() { + public boolean hasReflect() { if (Avros.isPrimitive(this)) { return false; } if (!this.subTypes.isEmpty()) { for (PType<?> subType : this.subTypes) { - if (((AvroType<?>) subType).isReflect()) { + if (((AvroType<?>) subType).hasReflect()) { return true; } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java index 24391ed..b3a9b7a 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -71,6 +71,14 @@ import com.google.common.collect.Maps; public class Avros { /** + * Older versions of Avro (i.e., before 1.7.0) do not support schemas that are composed of + * a mix of specific and reflection-based schemas. This bit controls whether or not we + * allow Crunch jobs to be created that involve mixing specific and reflection-based schemas + * and can be overridden by the client developer. + */ + public static boolean CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS = false; + + /** * The instance we use for generating reflected schemas. May be modified by * clients (e.g., Scrunch.) */ @@ -91,6 +99,18 @@ public class Avros { conf.getClass(REFLECT_DATA_FACTORY_CLASS, ReflectDataFactory.class), conf); } + public static void checkCombiningSpecificAndReflectionSchemas() { + if (!CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS) { + throw new IllegalStateException("Crunch does not support running jobs that" + + " contain a mixture of reflection-based and avro-generated data types." + + " Please consider turning your reflection-based type into an avro-generated" + + " type and using that generated type instead." + + " If the version of Avro you are using is 1.7.0 or greater, you can enable" + + " combined schemas by setting the Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS" + + " field to 'true'."); + } + } + public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() { @Override public String map(CharSequence input) { @@ -477,13 +497,20 @@ public class Avros { this.avroTypes = Lists.newArrayList(); this.jsonSchema = schema.toString(); boolean reflectFound = false; + boolean specificFound = false; for (PType ptype : ptypes) { AvroType atype = (AvroType) ptype; fns.add(atype.getOutputMapFn()); avroTypes.add(atype); - if (atype.isReflect()) { + if (atype.hasReflect()) { reflectFound = true; } + if (atype.hasSpecific()) { + specificFound = true; + } + } + if (specificFound && reflectFound) { + checkCombiningSpecificAndReflectionSchemas(); } this.isReflect = reflectFound; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java index 37ed801..5e03ff8 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java @@ -53,17 +53,17 @@ public class AvroTableTypeTest { @Test public void testIsReflect_ContainsReflectKey() { - assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).isReflect()); + assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).hasReflect()); } @Test public void testIsReflect_ContainsReflectValue() { - assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect()); + assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect()); } @Test public void testReflect_NoReflectKeyOrValue() { - assertFalse(Avros.tableOf(Avros.ints(), Avros.ints()).isReflect()); + assertFalse(Avros.tableOf(Avros.ints(), Avros.ints()).hasReflect()); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/89e58b3e/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java index 955467c..170bebf 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java @@ -42,7 +42,7 @@ public class AvroTypeTest { @Test public void testIsSpecific_SpecificData() { - assertTrue(Avros.records(Person.class).isSpecific()); + assertTrue(Avros.records(Person.class).hasSpecific()); } @Test @@ -52,7 +52,7 @@ public class AvroTypeTest { @Test public void testIsSpecific_GenericData() { - assertFalse(Avros.generics(Person.SCHEMA$).isSpecific()); + assertFalse(Avros.generics(Person.SCHEMA$).hasSpecific()); } @Test @@ -62,7 +62,7 @@ public class AvroTypeTest { @Test public void testIsSpecific_NonAvroClass() { - assertFalse(Avros.ints().isSpecific()); + assertFalse(Avros.ints().hasSpecific()); } @Test @@ -72,7 +72,7 @@ public class AvroTypeTest { @Test public void testIsSpecific_SpecificAvroTable() { - assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isSpecific()); + assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).hasSpecific()); } @Test @@ -82,7 +82,7 @@ public class AvroTypeTest { @Test public void testIsSpecific_GenericAvroTable() { - assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isSpecific()); + assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).hasSpecific()); } @Test @@ -92,52 +92,52 @@ public class AvroTypeTest { @Test public void testIsReflect_GenericType() { - assertFalse(Avros.generics(Person.SCHEMA$).isReflect()); + assertFalse(Avros.generics(Person.SCHEMA$).hasReflect()); } @Test public void testIsReflect_SpecificType() { - assertFalse(Avros.records(Person.class).isReflect()); + assertFalse(Avros.records(Person.class).hasReflect()); } @Test public void testIsReflect_ReflectSimpleType() { - assertTrue(Avros.reflects(StringWrapper.class).isReflect()); + assertTrue(Avros.reflects(StringWrapper.class).hasReflect()); } @Test public void testIsReflect_NonReflectSubType() { - assertFalse(Avros.pairs(Avros.ints(), Avros.ints()).isReflect()); + assertFalse(Avros.pairs(Avros.ints(), Avros.ints()).hasReflect()); } @Test public void testIsReflect_ReflectSubType() { - assertTrue(Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect()); + assertTrue(Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect()); } @Test public void testIsReflect_TableOfNonReflectTypes() { - assertFalse(Avros.tableOf(Avros.ints(), Avros.strings()).isReflect()); + assertFalse(Avros.tableOf(Avros.ints(), Avros.strings()).hasReflect()); } @Test public void testIsReflect_TableWithReflectKey() { - assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).isReflect()); + assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).hasReflect()); } @Test public void testIsReflect_TableWithReflectValue() { - assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect()); + assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).hasReflect()); } @Test public void testReflect_CollectionContainingReflectValue() { - assertTrue(Avros.collections(Avros.reflects(StringWrapper.class)).isReflect()); + assertTrue(Avros.collections(Avros.reflects(StringWrapper.class)).hasReflect()); } @Test public void testReflect_CollectionNotContainingReflectValue() { - assertFalse(Avros.collections(Avros.generics(Person.SCHEMA$)).isReflect()); + assertFalse(Avros.collections(Avros.generics(Person.SCHEMA$)).hasReflect()); } @Test
