Repository: carbondata Updated Branches: refs/heads/master 531ecdf3f -> b1c85fa55
[CARBONDATA-2430][SDK] Reshuffling of Columns given by user in SDK. Reshuffling of Columns given by the user in SDK. Order should be Sort COlumns -> Dimension -> Complex --> Measure This closes #2261 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b1c85fa5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b1c85fa5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b1c85fa5 Branch: refs/heads/master Commit: b1c85fa55eeca1a98b61117c6b46df9a28d60bca Parents: 531ecdf Author: sounakr <soun...@gmail.com> Authored: Wed May 2 21:29:57 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Mon May 7 16:36:46 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/datatype/ArrayType.java | 4 + .../core/metadata/datatype/StructField.java | 15 + .../core/metadata/schema/table/CarbonTable.java | 26 -- .../schema/table/TableSchemaBuilder.java | 72 ++- .../examples/DataFrameComplexTypeExample.scala | 147 +++++- .../TestNonTransactionalCarbonTable.scala | 459 +++++++++++++++++-- .../apache/spark/util/SparkTypeConverter.scala | 58 +-- .../carbondata/sdk/file/AvroCarbonWriter.java | 48 +- .../sdk/file/CarbonWriterBuilder.java | 104 +++-- .../org/apache/carbondata/sdk/file/Field.java | 4 +- .../sdk/file/AvroCarbonWriterTest.java | 214 +++++++++ 11 files changed, 963 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java index c30e21c..c327d7f 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java @@ -30,4 +30,8 @@ public class ArrayType extends DataType { public boolean isComplexType() { return true; } + + public DataType getElementType() { + return elementType; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java index efdc8e2..bfca057 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.metadata.datatype; import java.io.Serializable; +import java.util.List; public class StructField implements Serializable { @@ -27,9 +28,19 @@ public class StructField implements Serializable { private DataType dataType; + private List<StructField> children; + public StructField(String fieldName, DataType dataType) { this.fieldName = fieldName; this.dataType = dataType; + this.children = null; + } + + + public StructField(String fieldName, DataType dataType, List<StructField> children) { + this.fieldName = fieldName; + this.dataType = dataType; + this.children = children; } public DataType getDataType() { @@ -39,4 +50,8 @@ public class StructField implements Serializable { public String getFieldName() { return fieldName; } + + public List<StructField> getChildren() { + return children; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 4178d8a..cf5660f 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -37,8 +37,6 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; -import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.BucketingInfo; import org.apache.carbondata.core.metadata.schema.PartitionInfo; @@ -49,7 +47,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.reader.CarbonHeaderReader; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.TableProvider; @@ -60,7 +57,6 @@ import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.format.FileHeader; /** * Mapping class for Carbon actual table @@ -223,28 +219,6 @@ public class CarbonTable implements Serializable { } } - public static CarbonTable buildFromDataFile( - String tableName, String tablePath, String filePath) throws IOException { - CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(filePath); - FileHeader fileHeader = carbonHeaderReader.readHeader(); - TableSchemaBuilder builder = TableSchema.builder(); - ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl(); - for (org.apache.carbondata.format.ColumnSchema column : fileHeader.getColumn_schema()) { - ColumnSchema columnSchema = schemaConverter.fromExternalToWrapperColumnSchema(column); - builder.addColumn( - new StructField(columnSchema.getColumnName(), columnSchema.getDataType()), false); - } - - TableSchema tableSchema = builder.tableName(tableName).build(); - TableInfo tableInfo = new TableInfo(); - tableInfo.setFactTable(tableSchema); - tableInfo.setTablePath(tablePath); - tableInfo.setDatabaseName("default"); - tableInfo.setTableUniqueName( - CarbonTable.buildUniqueName("default", tableSchema.getTableName())); - return buildFromTableInfo(tableInfo); - } - public static CarbonTable buildFromTablePath(String tableName, String tablePath, boolean isTransactionalTable) throws IOException { if (isTransactionalTable) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 731fea8..42bb958 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.metadata.datatype.ArrayType; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.DecimalType; @@ -46,7 +47,11 @@ public class TableSchemaBuilder { private List<ColumnSchema> sortColumns = new LinkedList<>(); - private List<ColumnSchema> otherColumns = new LinkedList<>(); + private List<ColumnSchema> dimension = new LinkedList<>(); + + private List<ColumnSchema> complex = new LinkedList<>(); + + private List<ColumnSchema> measures = new LinkedList<>(); private int blockSize; @@ -86,7 +91,9 @@ public class TableSchemaBuilder { schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>()); schema.setSchemaEvalution(schemaEvol); List<ColumnSchema> allColumns = new LinkedList<>(sortColumns); - allColumns.addAll(otherColumns); + allColumns.addAll(dimension); + allColumns.addAll(complex); + allColumns.addAll(measures); schema.setListOfColumns(allColumns); Map<String, String> property = new HashMap<>(); @@ -108,21 +115,36 @@ public class TableSchemaBuilder { } public ColumnSchema addColumn(StructField field, boolean isSortColumn) { + return addColumn(field, null, isSortColumn, false); + } + + private ColumnSchema addColumn(StructField field, String parentName, boolean isSortColumn, + boolean isComplexChild) { Objects.requireNonNull(field); checkRepeatColumnName(field); ColumnSchema newColumn = new ColumnSchema(); - newColumn.setColumnName(field.getFieldName()); + if (parentName != null) { + newColumn.setColumnName(parentName + "." + field.getFieldName()); + } else { + newColumn.setColumnName(field.getFieldName()); + } newColumn.setDataType(field.getDataType()); if (isSortColumn || field.getDataType() == DataTypes.STRING || field.getDataType() == DataTypes.DATE || field.getDataType() == DataTypes.TIMESTAMP || - DataTypes.isStructType(field.getDataType())) { + field.getDataType().isComplexType() || + (isComplexChild)) { newColumn.setDimensionColumn(true); } else { newColumn.setDimensionColumn(false); } - newColumn.setSchemaOrdinal(ordinal++); + if (!isComplexChild) { + newColumn.setSchemaOrdinal(ordinal++); + } else { + // child column should not be counted for schema ordinal + newColumn.setSchemaOrdinal(-1); + } newColumn.setColumnar(true); // For NonTransactionalTable, multiple sdk writer output with same column name can be placed in @@ -135,7 +157,11 @@ public class TableSchemaBuilder { newColumn.setColumnReferenceId(newColumn.getColumnUniqueId()); newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn)); if (field.getDataType().isComplexType()) { - newColumn.setNumberOfChild(((StructType) field.getDataType()).getFields().size()); + if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { + newColumn.setNumberOfChild(1); + } else { + newColumn.setNumberOfChild(((StructType) field.getDataType()).getFields().size()); + } } if (DataTypes.isDecimal(field.getDataType())) { DecimalType decimalType = (DecimalType) field.getDataType(); @@ -143,17 +169,29 @@ public class TableSchemaBuilder { newColumn.setScale(decimalType.getScale()); } if (!isSortColumn) { - otherColumns.add(newColumn); + if (!newColumn.isDimensionColumn()) { + measures.add(newColumn); + } else if (DataTypes.isStructType(field.getDataType()) || + DataTypes.isArrayType(field.getDataType()) || isComplexChild) { + complex.add(newColumn); + } else { + dimension.add(newColumn); + } } if (newColumn.isDimensionColumn()) { newColumn.setUseInvertedIndex(true); } if (field.getDataType().isComplexType()) { - if (((StructType) field.getDataType()).getFields().size() > 0) { + String parentFieldName = newColumn.getColumnName(); + if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { + addColumn(new StructField("val", + ((ArrayType) field.getDataType()).getElementType()), field.getFieldName(), false, true); + } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT") + && ((StructType) field.getDataType()).getFields().size() > 0) { // This field has children. List<StructField> fields = ((StructType) field.getDataType()).getFields(); - for (int i = 0; i < fields.size(); i ++) { - addColumn(fields.get(i), false); + for (int i = 0; i < fields.size(); i++) { + addColumn(fields.get(i), parentFieldName, false, true); } } } @@ -169,7 +207,19 @@ public class TableSchemaBuilder { throw new IllegalArgumentException("column name already exists"); } } - for (ColumnSchema column : otherColumns) { + for (ColumnSchema column : dimension) { + if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) { + throw new IllegalArgumentException("column name already exists"); + } + } + + for (ColumnSchema column : complex) { + if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) { + throw new IllegalArgumentException("column name already exists"); + } + } + + for (ColumnSchema column : measures) { if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) { throw new IllegalArgumentException("column name already exists"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala index 34b32f4..0abf5c5 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala @@ -22,8 +22,19 @@ import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.carbondata.examples.util.ExampleUtils case class StructElement(school: Array[String], age: Int) + +case class StructElement1(school: Array[String], school1: Array[String], age: Int) + case class ComplexTypeData(id: Int, name: String, city: String, salary: Float, file: StructElement) +case class ComplexTypeData1(id: Int, + name: String, + city: String, + salary: Float, + file: StructElement1) + +case class ComplexTypeData2(id: Int, name: String, city: String, salary: Float, file: Array[String]) + // scalastyle:off println object DataFrameComplexTypeExample { @@ -34,16 +45,21 @@ object DataFrameComplexTypeExample { spark.close() } - def exampleBody(spark : SparkSession): Unit = { - val complexTableName = s"complex_type_table" + def exampleBody(spark: SparkSession): Unit = { + val complexTypeDictionaryTableName = s"complex_type_dictionary_table" + val complexTypeNoDictionaryTableName = s"complex_type_noDictionary_table" + val complexTypeNoDictionaryTableNameArray = s"complex_type_noDictionary_array_table" import spark.implicits._ // drop table if exists previously - spark.sql(s"DROP TABLE IF EXISTS ${ complexTableName }") + spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeDictionaryTableName }") + spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableName }") + spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableNameArray }") + spark.sql( s""" - | CREATE TABLE ${ complexTableName }( + | CREATE TABLE ${ complexTypeDictionaryTableName }( | id INT, | name STRING, | city STRING, @@ -56,6 +72,37 @@ object DataFrameComplexTypeExample { | 'dictionary_include'='city') | """.stripMargin) + spark.sql( + s""" + | CREATE TABLE ${ complexTypeNoDictionaryTableNameArray }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | file array<string> + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'sort_columns'='name', + | 'dictionary_include'='city') + | """.stripMargin) + + + spark.sql( + s""" + | CREATE TABLE ${ complexTypeNoDictionaryTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | file struct<school:array<string>, school1:array<string>, age:int> + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'sort_columns'='name') + | """.stripMargin) + + val sc = spark.sparkContext // generate data val df = sc.parallelize(Seq( @@ -66,30 +113,108 @@ object DataFrameComplexTypeExample { ComplexTypeData(3, "index_3", "city_3", 30000.0f, StructElement(Array("struct_31", "struct_32"), 30)) )).toDF + + // generate data + val df2 = sc.parallelize(Seq( + ComplexTypeData2(1, "index_1", "city_1", 10000.0f, Array("struct_11", "struct_12")), + ComplexTypeData2(2, "index_2", "city_2", 20000.0f, Array("struct_21", "struct_22")), + ComplexTypeData2(3, "index_3", "city_3", 30000.0f, Array("struct_31", "struct_32")) + )).toDF + + // generate data + val df1 = sc.parallelize(Seq( + ComplexTypeData1(1, "index_1", "city_1", 10000.0f, + StructElement1(Array("struct_11", "struct_12"), Array("struct_11", "struct_12"), 10)), + ComplexTypeData1(2, "index_2", "city_2", 20000.0f, + StructElement1(Array("struct_21", "struct_22"), Array("struct_11", "struct_12"), 20)), + ComplexTypeData1(3, "index_3", "city_3", 30000.0f, + StructElement1(Array("struct_31", "struct_32"), Array("struct_11", "struct_12"), 30)) + )).toDF + + df.printSchema() df.write .format("carbondata") - .option("tableName", complexTableName) + .option("tableName", complexTypeDictionaryTableName) + .mode(SaveMode.Append) + .save() + + df1.printSchema() + df1.write + .format("carbondata") + .option("tableName", complexTypeNoDictionaryTableName) .mode(SaveMode.Append) .save() - spark.sql(s"select count(*) from ${ complexTableName }").show(100, truncate = false) - spark.sql(s"select * from ${ complexTableName } order by id desc").show(300, truncate = false) + df2.printSchema() + df2.write + .format("carbondata") + .option("tableName", complexTypeNoDictionaryTableNameArray) + .mode(SaveMode.Append) + .save() + + + spark.sql(s"select count(*) from ${ complexTypeDictionaryTableName }") + .show(100, truncate = false) + + spark.sql(s"select * from ${ complexTypeDictionaryTableName } order by id desc") + .show(300, truncate = false) + + spark.sql(s"select * " + + s"from ${ complexTypeDictionaryTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + spark.sql(s"select * " + + s"from ${ complexTypeDictionaryTableName } " + + s"where id > 10 limit 100").show(100, truncate = false) + + // show segments + spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeDictionaryTableName }").show(false) + + // drop table + spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeDictionaryTableName }") + + + spark.sql(s"select count(*) from ${ complexTypeNoDictionaryTableName }") + .show(100, truncate = false) + + spark.sql(s"select * from ${ complexTypeNoDictionaryTableName } order by id desc") + .show(300, truncate = false) + + spark.sql(s"select * " + + s"from ${ complexTypeNoDictionaryTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + spark.sql(s"select * " + + s"from ${ complexTypeNoDictionaryTableName } " + + s"where id > 10 limit 100").show(100, truncate = false) + + // show segments + spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeNoDictionaryTableName }").show(false) + + // drop table + spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableName }") + + spark.sql(s"select count(*) from ${ complexTypeNoDictionaryTableNameArray }") + .show(100, truncate = false) + + spark.sql(s"select * from ${ complexTypeNoDictionaryTableNameArray } order by id desc") + .show(300, truncate = false) spark.sql(s"select * " + - s"from ${ complexTableName } " + + s"from ${ complexTypeNoDictionaryTableNameArray } " + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) spark.sql(s"select * " + - s"from ${ complexTableName } " + + s"from ${ complexTypeNoDictionaryTableNameArray } " + s"where id > 10 limit 100").show(100, truncate = false) // show segments - spark.sql(s"SHOW SEGMENTS FOR TABLE ${complexTableName}").show(false) + spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeNoDictionaryTableNameArray }").show(false) // drop table - spark.sql(s"DROP TABLE IF EXISTS ${ complexTableName }") + spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableNameArray }") } } // scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index d8e5374..fabcd02 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -17,13 +17,13 @@ package org.apache.carbondata.spark.testsuite.createTable -import java.io.{File, FileFilter, IOException} +import java.io.{File, FileFilter} import java.util import org.apache.commons.io.FileUtils import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest -import org.junit.{Assert, Test} +import org.junit.Assert import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -35,6 +35,7 @@ import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} import scala.collection.JavaConverters._ import scala.collection.mutable +import org.apache.avro import org.apache.commons.lang.CharEncoding import tech.allegro.schema.json2avro.converter.JsonAvroConverter @@ -254,7 +255,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'") sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false) - sql("select * from t1").show(200,false) sql("insert into sdkOutputTable select * from t1").show(200,false) checkAnswer(sql(s"""select * from sdkOutputTable where age = 12"""), @@ -545,7 +545,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION |'$writerPath' """.stripMargin) - sql("select * from sdkOutputTable").show(false) } assert(exception.getMessage() .contains("Operation not allowed: Invalid table path provided:")) @@ -687,49 +686,20 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } - def buildAvroTestData(rows: Int, options: util.Map[String, String]): Any = { - FileUtils.deleteDirectory(new File(writerPath)) - val newAvroSchema = "{" + " \"type\" : \"record\", " + " \"name\" : \"userInfo\", " + - " \"namespace\" : \"my.example\", " + - " \"fields\" : [{\"name\" : \"username\", " + - " \"type\" : \"string\", " + " \"default\" : \"NONE\"}, " + - " {\"name\" : \"age\", " + " \"type\" : \"int\", " + - " \"default\" : -1}, " + "{\"name\" : \"address\", " + - " \"type\" : { " + " \"type\" : \"record\", " + - " \"name\" : \"mailing_address\", " + " \"fields\" : [ {" + - " \"name\" : \"street\", " + - " \"type\" : \"string\", " + - " \"default\" : \"NONE\"}, { " + " \"name\" : \"city\", " + - " \"type\" : \"string\", " + " \"default\" : \"NONE\"}, " + - " ]}, " + " \"default\" : {} " + " } " + "}" - val mySchema = "{" + " \"name\": \"address\", " + " \"type\": \"record\", " + - " \"fields\": [ " + - " { \"name\": \"name\", \"type\": \"string\"}, " + - " { \"name\": \"age\", \"type\": \"int\"}, " + " { " + - " \"name\": \"address\", " + " \"type\": { " + - " \"type\" : \"record\", " + " \"name\" : \"my_address\", " + - " \"fields\" : [ " + - " {\"name\": \"street\", \"type\": \"string\"}, " + - " {\"name\": \"city\", \"type\": \"string\"} " + " ]} " + " } " + - "] " + "}" - val json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", " + - "\"city\":\"bang\"}}" + private def WriteFilesWithAvroWriter(rows: Int, + mySchema: String, + json: String, + fields: Array[Field]) = { // conversion to GenericData.Record - val nn = new org.apache.avro.Schema.Parser().parse(mySchema) + val nn = new avro.Schema.Parser().parse(mySchema) val converter = new JsonAvroConverter val record = converter .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn) - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.STRING) - // fields[1] = new Field("age", DataTypes.INT); - val fld = new util.ArrayList[StructField] - fld.add(new StructField("street", DataTypes.STRING)) - fld.add(new StructField("city", DataTypes.STRING)) - fields(2) = new Field("address", "struct", fld) + try { val writer = CarbonWriter.builder.withSchema(new Schema(fields)) - .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput + .outputPath(writerPath).isTransactionalTable(false) + .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput var i = 0 while (i < rows) { writer.write(record) @@ -745,28 +715,419 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } } - def buildAvroTestDataSingleFile(): Any = { + // struct type test + def buildAvroTestDataStruct(rows: Int, options: util.Map[String, String]): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + val mySchema = + """ + |{"name": "address", + | "type": "record", + | "fields": [ + | { "name": "name", "type": "string"}, + | { "name": "age", "type": "int"}, + | { "name": "address", "type": { + | "type" : "record", "name" : "my_address", + | "fields" : [ + | {"name": "street", "type": "string"}, + | {"name": "city", "type": "string"}]}} + |]} + """.stripMargin + + val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """ + + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fld.add(new StructField("city", DataTypes.STRING)) + fields(2) = new Field("address", "struct", fld) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) + } + + def buildAvroTestDataStructType(): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + buildAvroTestDataStruct(3, null) + } + + // array type test + def buildAvroTestDataArrayType(rows: Int, options: util.Map[String, String]): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + + val mySchema = """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "address", + | "type": { + | "type": "array", + | "items": { + | "name": "street", + | "type": "string" + | } + | } + | } + | ] + | } + """.stripMargin + + val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """ + + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + // fields[1] = new Field("age", DataTypes.INT); + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fields(2) = new Field("address", "array", fld) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) + } + + def buildAvroTestDataSingleFileArrayType(): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + buildAvroTestDataArrayType(3, null) + } + + // struct with array type test + def buildAvroTestDataStructWithArrayType(rows: Int, options: util.Map[String, String]): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + + val mySchema = """ + { + | "name": "address", + | "type": "record", + | "fields": [ + | { "name": "name", "type": "string"}, + | { "name": "age", "type": "int"}, + | { + | "name": "address", + | "type": { + | "type" : "record", + | "name" : "my_address", + | "fields" : [ + | {"name": "street", "type": "string"}, + | {"name": "city", "type": "string"} + | ]} + | }, + | {"name" :"doorNum", + | "type" : { + | "type" :"array", + | "items":{ + | "name" :"EachdoorNums", + | "type" : "int", + | "default":-1 + | }} + | }]} + """.stripMargin + + val json = + """ {"name":"bob", "age":10, + |"address" : {"street":"abc", "city":"bang"}, + |"doorNum" : [1,2,3,4]}""".stripMargin + + val fields = new Array[Field](4) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fld.add(new StructField("city", DataTypes.STRING)) + fields(2) = new Field("address", "struct", fld) + val fld1 = new util.ArrayList[StructField] + fld1.add(new StructField("eachDoorNum", DataTypes.INT)) + fields(3) = new Field("doorNum", "array", fld1) + WriteFilesWithAvroWriter(rows, mySchema, json, fields) + } + + def buildAvroTestDataBothStructArrayType(): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + buildAvroTestDataStructWithArrayType(3, null) + } + + + // ArrayOfStruct test + def buildAvroTestDataArrayOfStruct(rows: Int, options: util.Map[String, String]): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + + val mySchema = """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "doorNum", + | "type": { + | "type": "array", + | "items": { + | "type": "record", + | "name": "my_address", + | "fields": [ + | { + | "name": "street", + | "type": "string" + | }, + | { + | "name": "city", + | "type": "string" + | } + | ] + | } + | } + | } + | ] + |} """.stripMargin + val json = + """ {"name":"bob","age":10,"doorNum" : + |[{"street":"abc","city":"city1"}, + |{"street":"def","city":"city2"}, + |{"street":"ghi","city":"city3"}, + |{"street":"jkl","city":"city4"}]} """.stripMargin + + + + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fld.add(new StructField("city", DataTypes.STRING)) + + val fld2 = new util.ArrayList[StructField] + fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld)) + fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) + } + + def buildAvroTestDataArrayOfStructType(): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + buildAvroTestDataArrayOfStruct(3, null) + } + + + // StructOfArray test + def buildAvroTestDataStructOfArray(rows: Int, options: util.Map[String, String]): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + + val mySchema = """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "address", + | "type": { + | "type": "record", + | "name": "my_address", + | "fields": [ + | { + | "name": "street", + | "type": "string" + | }, + | { + | "name": "city", + | "type": "string" + | }, + | { + | "name": "doorNum", + | "type": { + | "type": "array", + | "items": { + | "name": "EachdoorNums", + | "type": "int", + | "default": -1 + | } + | } + | } + | ] + | } + | } + | ] + |} """.stripMargin + + val json = """ { + | "name": "bob", + | "age": 10, + | "address": { + | "street": "abc", + | "city": "bang", + | "doorNum": [ + | 1, + | 2, + | 3, + | 4 + | ] + | } + |} """.stripMargin + + + + + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val fld1 = new util.ArrayList[StructField] + fld1.add(new StructField("eachDoorNum", DataTypes.INT)) + + val fld2 = new util.ArrayList[StructField] + fld2.add(new StructField("street", DataTypes.STRING)) + fld2.add(new StructField("city", DataTypes.STRING)) + fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1)) + + fields(2) = new Field("address","struct",fld2) + WriteFilesWithAvroWriter(rows, mySchema, json, fields) + } + + def buildAvroTestDataStructOfArrayType(): Any = { FileUtils.deleteDirectory(new File(writerPath)) - buildAvroTestData(3, null) + buildAvroTestDataStructOfArray(3, null) } - test("Read sdk writer Avro output ") { - buildAvroTestDataSingleFile() + + test("Read sdk writer Avro output Record Type") { + buildAvroTestDataStructType() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") sql( s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION |'$writerPath' """.stripMargin) - sql("select * from sdkOutputTable").show(false) checkAnswer(sql("select * from sdkOutputTable"), Seq( - Row("bob", "10", Row("abc","bang")), - Row("bob", "10", Row("abc","bang")), - Row("bob", "10", Row("abc","bang")))) + Row("bob", 10, Row("abc","bang")), + Row("bob", 10, Row("abc","bang")), + Row("bob", 10, Row("abc","bang")))) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).listFiles().length > 0) + } + + test("Read sdk writer Avro output Array Type") { + buildAvroTestDataSingleFileArrayType() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + sql("select * from sdkOutputTable").show(200,false) + + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg"))), + Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg"))), + Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg"))))) sql("DROP TABLE sdkOutputTable") // drop table should not delete the files assert(new File(writerPath).exists()) } + + test("Read sdk writer Avro output with both Array and Struct Type") { + buildAvroTestDataBothStructArrayType() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + /* + *-+----+---+----------+------------+ + |name|age|address |doorNum | + +----+---+----------+------------+ + |bob |10 |[abc,bang]|[1, 2, 3, 4]| + |bob |10 |[abc,bang]|[1, 2, 3, 4]| + |bob |10 |[abc,bang]|[1, 2, 3, 4]| + +----+---+----------+------------+ + * */ + + checkAnswer(sql("select * from sdkOutputTable"), Seq( + Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4)), + Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4)), + Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4)))) + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + } + + + test("Read sdk writer Avro output with Array of struct") { + buildAvroTestDataArrayOfStructType() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + sql("select * from sdkOutputTable").show(false) + + // TODO: Add a validation + /* + +----+---+----------------------------------------------------+ + |name|age|doorNum | + +----+---+----------------------------------------------------+ + |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]| + |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]| + |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]| + +----+---+----------------------------------------------------+ */ + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + } + + + // Struct of array + test("Read sdk writer Avro output with struct of Array") { + buildAvroTestDataStructOfArrayType() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + sql("select * from sdkOutputTable").show(false) + + // TODO: Add a validation + /* + +----+---+-------------------------------------------------------+ + |name|age|address | + +----+---+-------------------------------------------------------+ + |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)] | + |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)] | + |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)] | + +----+---+-------------------------------------------------------+*/ + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala index fe11b98..65210b8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala @@ -97,32 +97,16 @@ private[spark] object SparkTypeConverter { def getStructChildren(table: CarbonTable, dimName: String): String = { table.getChildren(dimName).asScala.map(childDim => { childDim.getDataType.getName.toLowerCase match { - case "array" => if (table.isTransactionalTable) {s"${ + case "array" => s"${ childDim.getColName.substring(dimName.length + 1) - }:array<${ getArrayChildren(table, childDim.getColName) }>" } else { - // For non Transactional table the Childrends of Struct Columns - // are not appended with their parent. - s"${ - childDim.getColName - }:array<${ getArrayChildren(table, childDim.getColName) }>" - } - case "struct" => if (table.isTransactionalTable) { s"${ + }:array<${ getArrayChildren(table, childDim.getColName) }>" + case "struct" => s"${ childDim.getColName.substring(dimName.length + 1) }:struct<${ table.getChildren(childDim.getColName) .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",") - }>"} else { - s"${ - childDim.getColName - }:struct<${ table.getChildren(childDim.getColName) - .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",") - }>" - } - case dType => if (table.isTransactionalTable) { - s"${ childDim.getColName + }>" + case dType => s"${ childDim.getColName .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }" - } else { - s"${ childDim.getColName}:${ addDecimalScaleAndPrecision(childDim, dType) }" - } } }).mkString(",") } @@ -139,31 +123,13 @@ private[spark] object SparkTypeConverter { private def recursiveMethod( table: CarbonTable, dimName: String, childDim: CarbonDimension) = { childDim.getDataType.getName.toLowerCase match { - case "array" => if (table.isTransactionalTable) { - s"${ - childDim.getColName.substring(dimName.length + 1) - }:array<${ getArrayChildren(table, childDim.getColName) }>" - } else { - // For non Transactional table the Childrends of Struct Columns - // are not appended with their parent. - s"${ - childDim.getColName - }:array<${ getArrayChildren(table, childDim.getColName) }>" - } - case "struct" => if (table.isTransactionalTable) { - s"${ - childDim.getColName.substring(dimName.length + 1) - }:struct<${ getStructChildren(table, childDim.getColName) }>" - } else { - s"${ - childDim.getColName - }:struct<${ getStructChildren(table, childDim.getColName) }>" - } - case dType => if (table.isTransactionalTable) { - s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }" - } else { - s"${ childDim.getColName }:${ dType }" - } + case "array" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:array<${ getArrayChildren(table, childDim.getColName) }>" + case "struct" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:struct<${ getStructChildren(table, childDim.getColName) }>" + case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }" } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 8f1994a..bc2e9db 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -18,6 +18,7 @@ package org.apache.carbondata.sdk.file; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; @@ -69,16 +70,16 @@ class AvroCarbonWriter extends CarbonWriter { avroSchema = avroRecord.getSchema(); } List<Schema.Field> fields = avroSchema.getFields(); - Object [] csvField = new String[fields.size()]; + Object [] csvField = new Object[fields.size()]; for (int i = 0; i < fields.size(); i++) { - csvField[i] = avroFieldToString(fields.get(i), avroRecord.get(i)); + csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i), 0); } return csvField; } - private String avroFieldToString(Schema.Field fieldType, Object fieldValue) { + private String avroFieldToObject(Schema.Field avroField, Object fieldValue, int delimiterLevel) { StringBuilder out = new StringBuilder(); - Schema.Type type = fieldType.schema().getType(); + Schema.Type type = avroField.schema().getType(); switch (type) { case BOOLEAN: case INT: @@ -89,22 +90,47 @@ class AvroCarbonWriter extends CarbonWriter { out.append(fieldValue.toString()); break; case RECORD: - List<Schema.Field> fields = fieldType.schema().getFields(); + List<Schema.Field> fields = avroField.schema().getFields(); String delimiter = null; - for (int i = 0; i < fields.size(); i ++) { - if (i == 0) { + delimiterLevel ++; + for (int i = 0; i < fields.size(); i++) { + if (delimiterLevel == 1) { delimiter = "$"; - } else { + } else if (delimiterLevel > 1) { delimiter = ":"; } if (i != (fields.size() - 1)) { - out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i))) - .append(delimiter); + out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i), + delimiterLevel)).append(delimiter); + } else { + out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i), + delimiterLevel)); + } + } + break; + case ARRAY: + int size = ((ArrayList) fieldValue).size(); + String delimiterArray = null; + delimiterLevel ++; + if (delimiterLevel == 1) { + delimiterArray = "$"; + } else if (delimiterLevel > 1) { + delimiterArray = ":"; + } + + for (int i = 0; i < size; i++) { + if (i != size - 1) { + out.append(avroFieldToObject( + new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true), + ((ArrayList) fieldValue).get(i), delimiterLevel)).append(delimiterArray); } else { - out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i))); + out.append(avroFieldToObject( + new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true), + ((ArrayList) fieldValue).get(i), delimiterLevel)); } } break; + default: throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 68bc3ab..397f151 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -333,6 +333,23 @@ public class CarbonWriterBuilder { return new AvroCarbonWriter(loadModel); } + private void setCsvHeader(CarbonLoadModel model) { + Field[] fields = schema.getFields(); + StringBuilder builder = new StringBuilder(); + String[] columns = new String[fields.length]; + int i = 0; + for (Field field : fields) { + if (null != field) { + builder.append(field.getFieldName()); + builder.append(","); + columns[i++] = field.getFieldName(); + } + } + String header = builder.toString(); + model.setCsvHeader(header.substring(0, header.length() - 1)); + model.setCsvHeaderColumns(columns); + } + private CarbonLoadModel createLoadModel() throws IOException, InvalidLoadOptionException { // build CarbonTable using schema CarbonTable table = buildCarbonTable(); @@ -368,7 +385,7 @@ public class CarbonWriterBuilder { for (Field field : schema.getFields()) { if (null != field) { if (field.getDataType() == DataTypes.STRING || - field.getDataType() == DataTypes.DATE || + field.getDataType() == DataTypes.DATE || field.getDataType() == DataTypes.TIMESTAMP) { sortColumnsList.add(field.getFieldName()); } @@ -380,30 +397,9 @@ public class CarbonWriterBuilder { sortColumnsList = Arrays.asList(sortColumns); } ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()]; - for (Field field : schema.getFields()) { - if (null != field) { - if (field.getChildren() != null && field.getChildren().size() > 0) { - // Loop through the inner columns and for a StructData - List<StructField> structFieldsArray = - new ArrayList<StructField>(field.getChildren().size()); - String parentName = field.getFieldName(); - for (StructField childFld : field.getChildren()) { - structFieldsArray.add(new StructField(childFld.getFieldName(), childFld.getDataType())); - } - DataType complexType = DataTypes.createStructType(structFieldsArray); - tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false); - } else { - int isSortColumn = sortColumnsList.indexOf(field.getFieldName()); - ColumnSchema columnSchema = tableSchemaBuilder - .addColumn(new StructField(field.getFieldName(), field.getDataType()), - isSortColumn > -1); - if (isSortColumn > -1) { - columnSchema.setSortColumn(true); - sortColumnsSchemaList[isSortColumn] = columnSchema; - } - } - } - } + Field[] fields = schema.getFields(); + buildTableSchema(fields, tableSchemaBuilder, sortColumnsList, sortColumnsSchemaList); + tableSchemaBuilder.setSortColumns(Arrays.asList(sortColumnsSchemaList)); String tableName; String dbName; @@ -416,16 +412,56 @@ public class CarbonWriterBuilder { } TableSchema schema = tableSchemaBuilder.build(); schema.setTableName(tableName); - CarbonTable table = CarbonTable.builder() - .tableName(schema.getTableName()) - .databaseName(dbName) - .tablePath(path) - .tableSchema(schema) - .isTransactionalTable(isTransactionalTable) - .build(); + CarbonTable table = + CarbonTable.builder().tableName(schema.getTableName()).databaseName(dbName).tablePath(path) + .tableSchema(schema).isTransactionalTable(isTransactionalTable).build(); return table; } + private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder, + List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) { + for (Field field : fields) { + if (null != field) { + int isSortColumn = sortColumnsList.indexOf(field.getFieldName()); + if (isSortColumn > -1) { + // unsupported types for ("array", "struct", "double", "float", "decimal") + if (field.getDataType() == DataTypes.DOUBLE || field.getDataType() == DataTypes.FLOAT + || DataTypes.isDecimal(field.getDataType()) || field.getDataType().isComplexType()) { + throw new RuntimeException( + " sort columns not supported for " + "array, struct, double, float, decimal "); + } + } + + if (field.getChildren() != null && field.getChildren().size() > 0) { + if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { + // Loop through the inner columns and for a StructData + DataType complexType = + DataTypes.createArrayType(field.getChildren().get(0).getDataType()); + tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false); + } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) { + // Loop through the inner columns and for a StructData + List<StructField> structFieldsArray = + new ArrayList<StructField>(field.getChildren().size()); + for (StructField childFld : field.getChildren()) { + structFieldsArray + .add(new StructField(childFld.getFieldName(), childFld.getDataType())); + } + DataType complexType = DataTypes.createStructType(structFieldsArray); + tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false); + } + } else { + ColumnSchema columnSchema = tableSchemaBuilder + .addColumn(new StructField(field.getFieldName(), field.getDataType()), + isSortColumn > -1); + columnSchema.setSortColumn(true); + if (isSortColumn > -1) { + sortColumnsSchemaList[isSortColumn] = columnSchema; + } + } + } + } + } + /** * Save the schema of the {@param table} to {@param persistFilePath} * @param table table object containing schema @@ -465,6 +501,8 @@ public class CarbonWriterBuilder { options = new HashMap<>(); } CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table); - return builder.build(options, UUID, taskNo); + CarbonLoadModel build = builder.build(options, UUID, taskNo); + setCsvHeader(build); + return build; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java index 677047b..0db3bc5 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java @@ -104,7 +104,7 @@ public class Field { } else if (type.equalsIgnoreCase("double")) { this.type = DataTypes.DOUBLE; } else if (type.equalsIgnoreCase("array")) { - this.type = DataTypes.createStructType(fields); + this.type = DataTypes.createArrayType(fields.get(0).getDataType()); } else if (type.equalsIgnoreCase("struct")) { this.type = DataTypes.createStructType(fields); } @@ -113,6 +113,8 @@ public class Field { } } + + public Field(String name, DataType type, List<StructField> fields) { this.name = name; this.type = type; http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java index ed3f2f1..105fb6d 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java @@ -20,10 +20,13 @@ package org.apache.carbondata.sdk.file; import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.datatype.ArrayType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.metadata.datatype.StructType; @@ -37,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; +import scala.Array; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; import org.apache.avro.Schema; @@ -285,4 +289,214 @@ public class AvroCarbonWriterTest { FileUtils.deleteDirectory(new File(path)); } + + @Test + public void testWriteNestedRecordWithMeasure() throws IOException { + FileUtils.deleteDirectory(new File(path)); + + String mySchema = + "{" + + " \"name\": \"address\", " + + " \"type\": \"record\", " + + " \"fields\": [ " + + " { \"name\": \"name\", \"type\": \"string\"}, " + + " { \"name\": \"age\", \"type\": \"int\"}, " + + " { " + + " \"name\": \"address\", " + + " \"type\": { " + + " \"type\" : \"record\", " + + " \"name\" : \"my_address\", " + + " \"fields\" : [ " + + " {\"name\": \"street\", \"type\": \"string\"}, " + + " {\"name\": \"city\", \"type\": \"string\"} " + + " ]} " + + " } " + + "] " + + "}"; + + String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}"; + + + // conversion to GenericData.Record + Schema nn = new Schema.Parser().parse(mySchema); + JsonAvroConverter converter = new JsonAvroConverter(); + GenericData.Record record = converter.convertToGenericDataRecord( + json.getBytes(CharEncoding.UTF_8), nn); + + Field[] fields = new Field[3]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("name1", DataTypes.STRING); + // fields[1] = new Field("age", DataTypes.INT); + List fld = new ArrayList<StructField>(); + fld.add(new StructField("street", DataTypes.STRING)); + fld.add(new StructField("city", DataTypes.STRING)); + fields[2] = new Field("address", "struct", fld); + + try { + CarbonWriter writer = CarbonWriter.builder() + .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .outputPath(path) + .isTransactionalTable(true) + .buildWriterForAvroInput(); + + for (int i = 0; i < 100; i++) { + writer.write(record); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(1, dataFiles.length); + + FileUtils.deleteDirectory(new File(path)); + } + + + private void WriteAvroComplexData(String mySchema, String json, String[] sortColumns) + throws UnsupportedEncodingException, IOException, InvalidLoadOptionException { + Field[] fields = new Field[4]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("name1", DataTypes.STRING); + // fields[1] = new Field("age", DataTypes.INT); + List fld = new ArrayList<StructField>(); + fld.add(new StructField("street", DataTypes.STRING)); + fld.add(new StructField("city", DataTypes.STRING)); + fields[2] = new Field("address", "struct", fld); + List fld1 = new ArrayList<StructField>(); + fld1.add(new StructField("eachDoorNum", DataTypes.INT)); + fields[3] = new Field("doorNum","array",fld1); + + // conversion to GenericData.Record + Schema nn = new Schema.Parser().parse(mySchema); + JsonAvroConverter converter = new JsonAvroConverter(); + GenericData.Record record = converter.convertToGenericDataRecord( + json.getBytes(CharEncoding.UTF_8), nn); + + try { + CarbonWriter writer = CarbonWriter.builder() + .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .outputPath(path) + .isTransactionalTable(true).sortBy(sortColumns) + .buildWriterForAvroInput(); + + for (int i = 0; i < 100; i++) { + writer.write(record); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + + @Test + public void testWriteComplexRecord() throws IOException, InvalidLoadOptionException { + FileUtils.deleteDirectory(new File(path)); + + String mySchema = + "{" + + " \"name\": \"address\", " + + " \"type\": \"record\", " + + " \"fields\": [ " + + " { \"name\": \"name\", \"type\": \"string\"}, " + + " { \"name\": \"age\", \"type\": \"int\"}, " + + " { " + + " \"name\": \"address\", " + + " \"type\": { " + + " \"type\" : \"record\", " + + " \"name\" : \"my_address\", " + + " \"fields\" : [ " + + " {\"name\": \"street\", \"type\": \"string\"}, " + + " {\"name\": \"city\", \"type\": \"string\"} " + + " ]} " + + " }, " + + " {\"name\" :\"doorNum\", " + + " \"type\" : { " + + " \"type\" :\"array\", " + + " \"items\":{ " + + " \"name\" :\"EachdoorNums\", " + + " \"type\" : \"int\", " + + " \"default\":-1} " + + " } " + + " }] " + + "}"; + + String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, " + + " \"doorNum\" : [1,2,3,4]}"; + + WriteAvroComplexData(mySchema, json, null); + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(1, dataFiles.length); + + FileUtils.deleteDirectory(new File(path)); + } + + + @Test + public void testWriteComplexRecordWithSortColumns() throws IOException { + FileUtils.deleteDirectory(new File(path)); + + String mySchema = + "{" + + " \"name\": \"address\", " + + " \"type\": \"record\", " + + " \"fields\": [ " + + " { \"name\": \"name\", \"type\": \"string\"}, " + + " { \"name\": \"age\", \"type\": \"int\"}, " + + " { " + + " \"name\": \"address\", " + + " \"type\": { " + + " \"type\" : \"record\", " + + " \"name\" : \"my_address\", " + + " \"fields\" : [ " + + " {\"name\": \"street\", \"type\": \"string\"}, " + + " {\"name\": \"city\", \"type\": \"string\"} " + + " ]} " + + " }, " + + " {\"name\" :\"doorNum\", " + + " \"type\" : { " + + " \"type\" :\"array\", " + + " \"items\":{ " + + " \"name\" :\"EachdoorNums\", " + + " \"type\" : \"int\", " + + " \"default\":-1} " + + " } " + + " }] " + + "}"; + + String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, " + + " \"doorNum\" : [1,2,3,4]}"; + + try { + WriteAvroComplexData(mySchema, json, new String[] { "doorNum" }); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(true); + } + FileUtils.deleteDirectory(new File(path)); + } + + + }