[CARBONDATA-2460] [CARBONDATA-2461] [CARBONDATA-2462] Fixed bug in 
AvroCarbonWriter

Issue1: If Null type is passed from avro schema then Unsupported data
type exception is thrown.
Solution1: Ignore column which has NULL data type.

Issue2: Array fields were being cast to ArrayList without any instance
check.
Solution2: Check the instance of Array fields and cast appropriately.

This closes #2291


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3d8b085a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3d8b085a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3d8b085a

Branch: refs/heads/spark-2.3
Commit: 3d8b085a55f551122c7528b6981f1785a44fef3c
Parents: 61afa42
Author: kunal642 <kunalkapoor...@gmail.com>
Authored: Wed May 9 18:32:23 2018 +0530
Committer: kumarvishal09 <kumarvishal1...@gmail.com>
Committed: Fri May 11 13:38:53 2018 +0530

----------------------------------------------------------------------
 .../TestNonTransactionalCarbonTable.scala       |  47 ++++++++-
 .../carbondata/sdk/file/AvroCarbonWriter.java   | 103 ++++++++++++++-----
 2 files changed, 122 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d8b085a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 376501b..86fda21 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -32,8 +32,6 @@ import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.AvroCarbonWriter
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -43,7 +41,7 @@ import org.apache.commons.lang.CharEncoding
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter
 
 import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
-import org.apache.carbondata.sdk.file.{CarbonWriter, CarbonWriterBuilder, 
Field, Schema}
+import org.apache.carbondata.sdk.file.{AvroCarbonWriter, CarbonWriter, 
CarbonWriterBuilder, Field, Schema}
 
 
 class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll 
{
@@ -51,7 +49,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with 
BeforeAndAfterAll {
   var writerPath = new File(this.getClass.getResource("/").getPath
                             +
                             "../." +
-                            
"./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+                            "./target/SparkCarbonFileFormat/WriterOutput/")
     .getCanonicalPath
   //getCanonicalPath gives path with \, so code expects /. Need to handle in 
code ?
   writerPath = writerPath.replace("\\", "/")
@@ -1795,6 +1793,47 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
     }.getMessage.toLowerCase.contains("column: name specified in sort 
columns"))
   }
 
+  test("test if load is passing with NULL type") {
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |              {
+        |                      "name": "id",
+        |                      "type": "null"
+        |              },
+        |              {
+        |                      "name": "course_details",
+        |                      "type": {
+        |                              "name": "course_details",
+        |                              "type": "record",
+        |                              "fields": [
+        |                                      {
+        |                                              "name": 
"course_struct_course_time",
+        |                                              "type": "string"
+        |                                      }
+        |                              ]
+        |                      }
+        |              }
+        |      ]
+        |}""".stripMargin
+
+    val json1 =
+      """{"id": 101,"course_details": { 
"course_struct_course_time":"2014-01-05"  }}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val writer = 
CarbonWriter.builder.withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(schema1))
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+    writer.write(record)
+    writer.close()
+  }
+
   test("test if data load is success with a struct having timestamp column  ") 
{
     val schema1 =
       """{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d8b085a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 55fd211..137e3f4 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -75,15 +75,18 @@ public class AvroCarbonWriter extends CarbonWriter {
       avroSchema = avroRecord.getSchema();
     }
     List<Schema.Field> fields = avroSchema.getFields();
-    Object[] csvField = new Object[fields.size()];
+    List<Object> csvFields = new ArrayList<>();
     for (int i = 0; i < fields.size(); i++) {
-      csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i));
+      Object field = avroFieldToObject(fields.get(i), avroRecord.get(i));
+      if (field != null) {
+        csvFields.add(field);
+      }
     }
-    return csvField;
+    return csvFields.toArray();
   }
 
   private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
-    Object out = new Object();
+    Object out;
     Schema.Type type = avroField.schema().getType();
     switch (type) {
       case BOOLEAN:
@@ -102,24 +105,45 @@ public class AvroCarbonWriter extends CarbonWriter {
 
         Object[] structChildObjects = new Object[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
-          structChildObjects[i] =
+          Object childObject =
               avroFieldToObject(fields.get(i), ((GenericData.Record) 
fieldValue).get(i));
+          if (childObject != null) {
+            structChildObjects[i] = childObject;
+          }
         }
         StructObject structObject = new StructObject(structChildObjects);
         out = structObject;
         break;
       case ARRAY:
-        int size = ((ArrayList) fieldValue).size();
-        Object[] arrayChildObjects = new Object[size];
-        for (int i = 0; i < size; i++) {
-          arrayChildObjects[i] = (avroFieldToObject(
-              new Schema.Field(avroField.name(), 
avroField.schema().getElementType(), null, true),
-              ((ArrayList) fieldValue).get(i)));
+        Object[] arrayChildObjects;
+        if (fieldValue instanceof GenericData.Array) {
+          int size = ((GenericData.Array) fieldValue).size();
+          arrayChildObjects = new Object[size];
+          for (int i = 0; i < size; i++) {
+            Object childObject = avroFieldToObject(
+                new Schema.Field(avroField.name(), 
avroField.schema().getElementType(), null, true),
+                ((GenericData.Array) fieldValue).get(i));
+            if (childObject != null) {
+              arrayChildObjects[i] = childObject;
+            }
+          }
+        } else {
+          int size = ((ArrayList) fieldValue).size();
+          arrayChildObjects = new Object[size];
+          for (int i = 0; i < size; i++) {
+            Object childObject = avroFieldToObject(
+                new Schema.Field(avroField.name(), 
avroField.schema().getElementType(), null, true),
+                ((ArrayList) fieldValue).get(i));
+            if (childObject != null) {
+              arrayChildObjects[i] = childObject;
+            }
+          }
         }
-        ArrayObject arrayObject = new ArrayObject(arrayChildObjects);
-        out = arrayObject;
+        out = new ArrayObject(arrayChildObjects);
+        break;
+      case NULL:
+        out = null;
         break;
-
       default:
         throw new UnsupportedOperationException(
             "carbon not support " + type.toString() + " avro type yet");
@@ -142,7 +166,10 @@ public class AvroCarbonWriter extends CarbonWriter {
     Field[] carbonField = new Field[avroSchema.getFields().size()];
     int i = 0;
     for (Schema.Field avroField : avroSchema.getFields()) {
-      carbonField[i] = prepareFields(avroField);
+      Field field = prepareFields(avroField);
+      if (field != null) {
+        carbonField[i] = field;
+      }
       i++;
     }
     return new org.apache.carbondata.sdk.file.Schema(carbonField);
@@ -169,15 +196,25 @@ public class AvroCarbonWriter extends CarbonWriter {
         // recursively get the sub fields
         ArrayList<StructField> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          structSubFields.add(prepareSubFields(avroSubField.name(), 
avroSubField.schema()));
+          StructField structField = prepareSubFields(avroSubField.name(), 
avroSubField.schema());
+          if (structField != null) {
+            structSubFields.add(structField);
+          }
         }
         return new Field(FieldName, "struct", structSubFields);
       case ARRAY:
         // recursively get the sub fields
         ArrayList<StructField> arraySubField = new ArrayList<>();
         // array will have only one sub field.
-        arraySubField.add(prepareSubFields("val", 
childSchema.getElementType()));
-        return new Field(FieldName, "array", arraySubField);
+        StructField structField = prepareSubFields("val", 
childSchema.getElementType());
+        if (structField != null) {
+          arraySubField.add(structField);
+          return new Field(FieldName, "array", arraySubField);
+        } else {
+          return null;
+        }
+      case NULL:
+        return null;
       default:
         throw new UnsupportedOperationException(
             "carbon not support " + type.toString() + " avro type yet");
@@ -203,14 +240,23 @@ public class AvroCarbonWriter extends CarbonWriter {
         // recursively get the sub fields
         ArrayList<StructField> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          structSubFields.add(prepareSubFields(avroSubField.name(), 
avroSubField.schema()));
+          StructField structField = prepareSubFields(avroSubField.name(), 
avroSubField.schema());
+          if (structField != null) {
+            structSubFields.add(structField);
+          }
         }
         return (new StructField(FieldName, 
DataTypes.createStructType(structSubFields)));
       case ARRAY:
         // recursively get the sub fields
         // array will have only one sub field.
-        return (new StructField(FieldName, DataTypes.createArrayType(
-            getMappingDataTypeForArrayRecord(childSchema.getElementType()))));
+        DataType subType = 
getMappingDataTypeForArrayRecord(childSchema.getElementType());
+        if (subType != null) {
+          return (new StructField(FieldName, 
DataTypes.createArrayType(subType)));
+        } else {
+          return null;
+        }
+      case NULL:
+        return null;
       default:
         throw new UnsupportedOperationException(
             "carbon not support " + type.toString() + " avro type yet");
@@ -235,13 +281,22 @@ public class AvroCarbonWriter extends CarbonWriter {
         // recursively get the sub fields
         ArrayList<StructField> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          structSubFields.add(prepareSubFields(avroSubField.name(), 
avroSubField.schema()));
+          StructField structField = prepareSubFields(avroSubField.name(), 
avroSubField.schema());
+          if (structField != null) {
+            structSubFields.add(structField);
+          }
         }
         return DataTypes.createStructType(structSubFields);
       case ARRAY:
         // array will have only one sub field.
-        return DataTypes.createArrayType(
-            getMappingDataTypeForArrayRecord(childSchema.getElementType()));
+        DataType subType = 
getMappingDataTypeForArrayRecord(childSchema.getElementType());
+        if (subType != null) {
+          return DataTypes.createArrayType(subType);
+        } else {
+          return null;
+        }
+      case NULL:
+        return null;
       default:
         throw new UnsupportedOperationException(
             "carbon not support " + childSchema.getType().toString() + " avro 
type yet");

Reply via email to