[CARBONDATA-2460] [CARBONDATA-2461] [CARBONDATA-2462] Fixed bug in AvroCarbonWriter
Issue1: If Null type is passed from avro schema then Unsupported data type exception is thrown. Solution1: Ignore column which has NULL data type. Issue2: Array fields were being cast to ArrayList without any instance check. Solution2: Check the instance of Array fields and cast appropriately. This closes #2291 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3d8b085a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3d8b085a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3d8b085a Branch: refs/heads/spark-2.3 Commit: 3d8b085a55f551122c7528b6981f1785a44fef3c Parents: 61afa42 Author: kunal642 <kunalkapoor...@gmail.com> Authored: Wed May 9 18:32:23 2018 +0530 Committer: kumarvishal09 <kumarvishal1...@gmail.com> Committed: Fri May 11 13:38:53 2018 +0530 ---------------------------------------------------------------------- .../TestNonTransactionalCarbonTable.scala | 47 ++++++++- .../carbondata/sdk/file/AvroCarbonWriter.java | 103 ++++++++++++++----- 2 files changed, 122 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d8b085a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 376501b..86fda21 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -32,8 +32,6 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.sdk.file.AvroCarbonWriter import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -43,7 +41,7 @@ import org.apache.commons.lang.CharEncoding import tech.allegro.schema.json2avro.converter.JsonAvroConverter import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField} -import org.apache.carbondata.sdk.file.{CarbonWriter, CarbonWriterBuilder, Field, Schema} +import org.apache.carbondata.sdk.file.{AvroCarbonWriter, CarbonWriter, CarbonWriterBuilder, Field, Schema} class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { @@ -51,7 +49,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { var writerPath = new File(this.getClass.getResource("/").getPath + "../." + - "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + "./target/SparkCarbonFileFormat/WriterOutput/") .getCanonicalPath //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? writerPath = writerPath.replace("\\", "/") @@ -1795,6 +1793,47 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { }.getMessage.toLowerCase.contains("column: name specified in sort columns")) } + test("test if load is passing with NULL type") { + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": "null" + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_time", + | "type": "string" + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05" }}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn) + + val writer = CarbonWriter.builder.withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(schema1)) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput + writer.write(record) + writer.close() + } + test("test if data load is success with a struct having timestamp column ") { val schema1 = """{ http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d8b085a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 55fd211..137e3f4 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -75,15 +75,18 @@ public class AvroCarbonWriter extends CarbonWriter { avroSchema = avroRecord.getSchema(); } List<Schema.Field> fields = avroSchema.getFields(); - Object[] csvField = new Object[fields.size()]; + List<Object> csvFields = new ArrayList<>(); for (int i = 0; i < fields.size(); i++) { - csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i)); + Object field = avroFieldToObject(fields.get(i), avroRecord.get(i)); + if (field != null) { + csvFields.add(field); + } } - return csvField; + return csvFields.toArray(); } private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) { - Object out = new Object(); + Object out; Schema.Type type = avroField.schema().getType(); switch (type) { case BOOLEAN: @@ -102,24 +105,45 @@ public class AvroCarbonWriter extends CarbonWriter { Object[] structChildObjects = new Object[fields.size()]; for (int i = 0; i < fields.size(); i++) { - structChildObjects[i] = + Object childObject = avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i)); + if (childObject != null) { + structChildObjects[i] = childObject; + } } StructObject structObject = new StructObject(structChildObjects); out = structObject; break; case ARRAY: - int size = ((ArrayList) fieldValue).size(); - Object[] arrayChildObjects = new Object[size]; - for (int i = 0; i < size; i++) { - arrayChildObjects[i] = (avroFieldToObject( - new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true), - ((ArrayList) fieldValue).get(i))); + Object[] arrayChildObjects; + if (fieldValue instanceof GenericData.Array) { + int size = ((GenericData.Array) fieldValue).size(); + arrayChildObjects = new Object[size]; + for (int i = 0; i < size; i++) { + Object childObject = avroFieldToObject( + new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true), + ((GenericData.Array) fieldValue).get(i)); + if (childObject != null) { + arrayChildObjects[i] = childObject; + } + } + } else { + int size = ((ArrayList) fieldValue).size(); + arrayChildObjects = new Object[size]; + for (int i = 0; i < size; i++) { + Object childObject = avroFieldToObject( + new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true), + ((ArrayList) fieldValue).get(i)); + if (childObject != null) { + arrayChildObjects[i] = childObject; + } + } } - ArrayObject arrayObject = new ArrayObject(arrayChildObjects); - out = arrayObject; + out = new ArrayObject(arrayChildObjects); + break; + case NULL: + out = null; break; - default: throw new UnsupportedOperationException( "carbon not support " + type.toString() + " avro type yet"); @@ -142,7 +166,10 @@ public class AvroCarbonWriter extends CarbonWriter { Field[] carbonField = new Field[avroSchema.getFields().size()]; int i = 0; for (Schema.Field avroField : avroSchema.getFields()) { - carbonField[i] = prepareFields(avroField); + Field field = prepareFields(avroField); + if (field != null) { + carbonField[i] = field; + } i++; } return new org.apache.carbondata.sdk.file.Schema(carbonField); @@ -169,15 +196,25 @@ public class AvroCarbonWriter extends CarbonWriter { // recursively get the sub fields ArrayList<StructField> structSubFields = new ArrayList<>(); for (Schema.Field avroSubField : childSchema.getFields()) { - structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema())); + StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); + if (structField != null) { + structSubFields.add(structField); + } } return new Field(FieldName, "struct", structSubFields); case ARRAY: // recursively get the sub fields ArrayList<StructField> arraySubField = new ArrayList<>(); // array will have only one sub field. - arraySubField.add(prepareSubFields("val", childSchema.getElementType())); - return new Field(FieldName, "array", arraySubField); + StructField structField = prepareSubFields("val", childSchema.getElementType()); + if (structField != null) { + arraySubField.add(structField); + return new Field(FieldName, "array", arraySubField); + } else { + return null; + } + case NULL: + return null; default: throw new UnsupportedOperationException( "carbon not support " + type.toString() + " avro type yet"); @@ -203,14 +240,23 @@ public class AvroCarbonWriter extends CarbonWriter { // recursively get the sub fields ArrayList<StructField> structSubFields = new ArrayList<>(); for (Schema.Field avroSubField : childSchema.getFields()) { - structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema())); + StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); + if (structField != null) { + structSubFields.add(structField); + } } return (new StructField(FieldName, DataTypes.createStructType(structSubFields))); case ARRAY: // recursively get the sub fields // array will have only one sub field. - return (new StructField(FieldName, DataTypes.createArrayType( - getMappingDataTypeForArrayRecord(childSchema.getElementType())))); + DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType()); + if (subType != null) { + return (new StructField(FieldName, DataTypes.createArrayType(subType))); + } else { + return null; + } + case NULL: + return null; default: throw new UnsupportedOperationException( "carbon not support " + type.toString() + " avro type yet"); @@ -235,13 +281,22 @@ public class AvroCarbonWriter extends CarbonWriter { // recursively get the sub fields ArrayList<StructField> structSubFields = new ArrayList<>(); for (Schema.Field avroSubField : childSchema.getFields()) { - structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema())); + StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema()); + if (structField != null) { + structSubFields.add(structField); + } } return DataTypes.createStructType(structSubFields); case ARRAY: // array will have only one sub field. - return DataTypes.createArrayType( - getMappingDataTypeForArrayRecord(childSchema.getElementType())); + DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType()); + if (subType != null) { + return DataTypes.createArrayType(subType); + } else { + return null; + } + case NULL: + return null; default: throw new UnsupportedOperationException( "carbon not support " + childSchema.getType().toString() + " avro type yet");