Author: daijy Date: Wed Mar 18 17:03:59 2015 New Revision: 1667589 URL: http://svn.apache.org/r1667589 Log: PIG-4463: AvroMapWrapper still leaks Avro data types and AvroStorageDataConversionUtilities do not handle Pig maps
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1667589&r1=1667588&r2=1667589&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Mar 18 17:03:59 2015 @@ -54,6 +54,9 @@ PIG-4333: Split BigData tests into multi BUG FIXES +PIG-4463: AvroMapWrapper still leaks Avro data types and AvroStorageDataConversionUtilities do not handle + Pig maps (rdsr via daijy) + PIG-4460: TestBuiltIn testValueListOutputSchemaComplexType and testValueSetOutputSchemaComplexType tests create bags whose inner schema is not a tuple (erwaman via daijy) Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1667589&r1=1667588&r2=1667589&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java Wed Mar 18 17:03:59 2015 @@ -117,11 +117,7 @@ public final class AvroMapWrapper implem new Function() { @Override public Object apply(final Object v) { - if (v instanceof Utf8) { - return v.toString(); - } else { - return v; - } + return AvroTupleWrapper.getPigObject(v); } } ); @@ -133,18 +129,13 @@ public final class AvroMapWrapper implem Sets.newHashSetWithExpectedSize(innerMap.size()); for (java.util.Map.Entry<CharSequence, Object> e : innerMap.entrySet()) { CharSequence k = e.getKey(); - Object v = e.getValue(); + final Object v = AvroTupleWrapper.getPigObject(e.getValue()); if (k instanceof Utf8) { k = k.toString(); } - if (v instanceof Utf8) { - v = v.toString(); - } theSet.add(new AbstractMap.SimpleEntry<CharSequence, Object>(k, v)); } - return theSet; - } } Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1667589&r1=1667588&r2=1667589&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Wed Mar 18 17:03:59 2015 @@ -21,11 +21,14 @@ package org.apache.pig.impl.util.avro; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; @@ -54,35 +57,7 @@ public class AvroStorageDataConversionUt for (Field f : s.getFields()) { Object o = t.get(f.pos()); Schema innerSchema = f.schema(); - if (AvroStorageSchemaConversionUtilities.isNullableUnion(innerSchema)) { - if (o == null) { - record.put(f.pos(), null); - continue; - } - innerSchema = AvroStorageSchemaConversionUtilities - .removeSimpleUnion(innerSchema); - } - switch(innerSchema.getType()) { - case RECORD: - record.put(f.pos(), packIntoAvro((Tuple) o, innerSchema)); - break; - case ARRAY: - record.put(f.pos(), packIntoAvro((DataBag) o, innerSchema)); - break; - case BYTES: - record.put(f.pos(), ByteBuffer.wrap(((DataByteArray) o).get())); - break; - case FIXED: - record.put(f.pos(), new GenericData.Fixed( - innerSchema, ((DataByteArray) o).get())); - break; - default: - if (t.getType(f.pos()) == DataType.DATETIME) { - record.put(f.pos(), ((DateTime) o).getMillis() ); - } else { - record.put(f.pos(), o); - } - } + record.put(f.pos(), packIntoAvro(o, innerSchema)); } return record; } catch (Exception e) { @@ -123,5 +98,52 @@ public class AvroStorageDataConversionUt } } + private static Object packIntoAvro(final Object o, Schema s) + throws IOException { + if (AvroStorageSchemaConversionUtilities.isNullableUnion(s)) { + if (o == null) { + return null; + } + s = AvroStorageSchemaConversionUtilities.removeSimpleUnion(s); + } + // what if o == null and schema doesn't allow it ? + switch (s.getType()) { + case RECORD: + return packIntoAvro((Tuple) o, s); + case ARRAY: + return packIntoAvro((DataBag) o, s); + case MAP: + return packIntoAvro((Map<CharSequence, Object>) o, s); + case BYTES: + return ByteBuffer.wrap(((DataByteArray) o).get()); + case FIXED: + return new GenericData.Fixed(s, ((DataByteArray) o).get()); + default: + if (DataType.findType(o) == DataType.DATETIME) { + return ((DateTime) o).getMillis(); + } else { + return o; + } + } + } + private static Map<Utf8, Object> packIntoAvro(Map<CharSequence, Object> input, Schema schema) + throws IOException { + final Map<Utf8, Object> output = new HashMap<Utf8, Object>(); + for (Map.Entry<CharSequence, Object> e : input.entrySet()) { + final Utf8 k = utf8(e.getKey()); + output.put(k, packIntoAvro(e.getValue(), schema.getValueType())); + } + return output; + } + + private static Utf8 utf8(CharSequence v) { + if (v instanceof Utf8) { + return (Utf8) v; + } else { + final StringBuilder sb = new StringBuilder(v.length()); + sb.append(v); + return new Utf8(sb.toString()); + } + } } Modified: pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1667589&r1=1667588&r2=1667589&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java (original) +++ pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java Wed Mar 18 17:03:59 2015 @@ -34,8 +34,11 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.Iterator; +import com.google.common.io.Closeables; import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; @@ -56,6 +59,8 @@ import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; import org.apache.pig.builtin.mock.Storage.Data; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.Utils; import org.apache.pig.impl.util.avro.AvroBagWrapper; @@ -904,6 +909,94 @@ public class TestAvroStorage { assertEquals("bar", v); } + @Test + public void testAvroMapWrapper() throws Exception { + final Map<CharSequence, Object> m = new HashMap<CharSequence, Object>(); + for (String fn : avroSchemas) { + final String avro = basedir + "data/avro/uncompressed/" + fn + ".avro"; + int i = 0; + for (GenericContainer r : readAvroData(avro)) { + m.put(new Utf8(fn + i), r); + i += 1; + } + } + final AvroMapWrapper amw = new AvroMapWrapper(m); + // Test out all the interfaces the AvroMapWrapper supports + for (Object o : amw.values()) { + assertTrue(isValidPigObject(o)); + } + for (CharSequence k : amw.keySet()) { + assertTrue(isValidPigObject(amw.get(k))); + } + for (Map.Entry<CharSequence, Object> e : amw.entrySet()) { + assertTrue(isValidPigObject(e.getValue())); + } + } + + private boolean isValidPigObject(Object o) { + if (o == null) { + return true; + } + switch (DataType.findType(o)) { + case DataType.TUPLE: + for (Object inner : ((Tuple) o).getAll()) { + if (!isValidPigObject(inner)) { + return false; + } + } + return true; + case DataType.BAG: + final Iterator<Tuple> bi = ((DataBag) o).iterator(); + while (bi.hasNext()) { + if (!isValidPigObject(bi.next())) { + return false; + } + } + return true; + case DataType.MAP: + for (Object inner : ((Map) o).values()) { + if (!isValidPigObject(inner)) { + return false; + } + } + return true; + case DataType.BIGDECIMAL: + case DataType.BIGINTEGER: + case DataType.BOOLEAN: + case DataType.BYTE: + case DataType.BYTEARRAY: + case DataType.CHARARRAY: + case DataType.DATETIME: + case DataType.DOUBLE: + case DataType.FLOAT: + case DataType.GENERIC_WRITABLECOMPARABLE: + case DataType.INTEGER: + case DataType.LONG: + return true; + case DataType.ERROR: + default: + return false; + } + } + + private List<GenericContainer> readAvroData(String path) throws IOException { + final FileSystem fs = FileSystem.getLocal(new Configuration()); + final Path filePath = new Path(path); + assertTrue("File path " + filePath + " does not exists!", fs.exists(filePath)); + final GenericDatumReader<GenericContainer> reader = new GenericDatumReader<GenericContainer>(); + final DataFileStream<GenericContainer> in = new DataFileStream<GenericContainer>(fs.open(filePath), reader); + final List<GenericContainer> avroData = new ArrayList<GenericContainer>(); + try { + while (in.hasNext()) { + GenericContainer obj = in.next(); + avroData.add(obj); + } + } finally { + Closeables.closeQuietly(in); + } + return avroData; + } + private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException { pigServerLocal.setBatchOn(); Modified: pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java?rev=1667589&r1=1667588&r2=1667589&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java (original) +++ pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java Wed Mar 18 17:03:59 2015 @@ -17,18 +17,13 @@ */ package org.apache.pig.impl.util.avro; -import com.google.common.collect.ImmutableMap; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; import org.apache.pig.ResourceSchema; -import org.apache.pig.data.Tuple; import org.junit.Assert; import org.junit.Test; - import java.io.File; import java.io.IOException; -import java.util.Map; public class TestAvroStorageSchemaConversionUtilities { final private static String BASE_DIR = "test/org/apache/pig/builtin/avro/schema/"; @@ -47,32 +42,6 @@ public class TestAvroStorageSchemaConver parse(BASE_DIR + "nullableArrayInMap.avsc", true)); } - /** - * Test verifies that Avro records as map values are correctly converted to tuples - */ - @Test - public void testRecordAsMapValue() throws IOException { - final Schema schema = new Schema.Parser().parse(new File(BASE_DIR, "recordInMap.avsc")); - final GenericData.Record record = new GenericData.Record(schema); - record.put("key", "k"); - record.put("value", 42); - final Schema valueSchema = schema.getField("parameters").schema().getValueType(); - final GenericData.Record valueRecord = new GenericData.Record(valueSchema); - valueRecord.put("id", 1); - record.put("parameters", ImmutableMap.of("record_in_map", valueRecord)); - final Tuple tuple = new AvroTupleWrapper<GenericData.Record>(record); - // Third parameter is the map - final Object o = tuple.get(2); - Assert.assertTrue(o instanceof Map); - final Map<CharSequence, Object> map = (Map<CharSequence, Object>) o; - final Object recordInMap = map.get("record_in_map"); - // Assert that the record got converted to a tuple - Assert.assertTrue(recordInMap instanceof Tuple); - final Tuple tuple2 = (Tuple) recordInMap; - final Object id = tuple2.get(0); - Assert.assertEquals(1, id); - } - private static String parse(String schema, boolean recursive) throws IOException { final Schema s = new Schema.Parser().parse(new File(schema)); final ResourceSchema resourceSchema = AvroStorageSchemaConversionUtilities.avroSchemaToResourceSchema(s, recursive);