[CARBONDATA-2979] select count fails when carbondata file is written through SDK and read through sparkfileformat for complex datatype map(struct->array->map)
Problem Select query failed issue for map type when data is loaded using avro SDK and external table using carbon file format is used to query the data Analysis When data is loaded through Avro SDK which has a schema of type struct<array>, fieldName was hard coded to val because of which during query the schema written in the file footer and schema inferred for the external table had a mismatch which lead to failure. Solution Instead of hard coding the field value as val use the given field name in the schema This closes #2774 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/682160fa Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/682160fa Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/682160fa Branch: refs/heads/branch-1.5 Commit: 682160fa1bbde5f13c8a28e0114d3f18e5ffaf79 Parents: e9a198a Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Thu Sep 27 18:02:34 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Oct 3 19:57:50 2018 +0530 ---------------------------------------------------------------------- .../datasource/SparkCarbonDataSourceTest.scala | 63 +++++++++++++++++++- .../sql/carbondata/datasource/TestUtil.scala | 56 ++++++++++++++++- .../carbondata/sdk/file/AvroCarbonWriter.java | 11 ++-- 3 files changed, 122 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/682160fa/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala index 3be8cb3..37677d0 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala @@ -1117,11 +1117,11 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { } private def createParquetTable { - FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2")) + val path = FileFactory.getUpdatedFilePath(s"$warehouse1/../warehouse2") + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$path")) spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " + s"string," + - s"salary long, floatField float, bytefield byte) using parquet location " + - s"'$warehouse1/../warehouse2'") + s"salary long, floatField float, bytefield byte) using parquet location '$path'") (0 to 10).foreach { i => spark.sql(s"insert into par_table select 'true','$i', ${i.toDouble / 2}, 'name$i', " + s"'address$i', ${i*100}, $i.$i, '$i'") @@ -1181,6 +1181,63 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { } } + def buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath: String, rows: Int): Unit = { + FileFactory.deleteAllFilesOfDir(new File(writerPath)) + val mySchema = + """ + |{ + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "structRecord", + | "type": { + | "type": "record", + | "name": "my_address", + | "fields": [ + | { + | "name": "street", + | "type": "string" + | }, + | { + | "name": "houseDetails", + | "type": { + | "type": "array", + | "items": { + | "name": "memberDetails", + | "type": "map", + | "values": "string" + | } + | } + | } + | ] + | } + | } + | ] + |} + """.stripMargin + val json = """ {"name":"bob", "age":10, "structRecord": {"street":"street1", "houseDetails": [{"101": "Rahul", "102": "Pawan"}]}} """.stripMargin + TestUtil.WriteFilesWithAvroWriter(writerPath, rows, mySchema, json) + } + + test("test external table with struct type with value as nested struct<array<map>> type") { + val writerPath: String = FileFactory.getUpdatedFilePath(warehouse1 + "/sdk1") + val rowCount = 3 + buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath, rowCount) + spark.sql("drop table if exists carbon_external") + spark.sql(s"create table carbon_external using carbon location '$writerPath'") + assert(spark.sql("select * from carbon_external").count() == rowCount) + spark.sql("drop table if exists carbon_external") + } + test("test byte and float for multiple pages") { val path = new File(warehouse1+"/sdk1").getAbsolutePath FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/682160fa/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala index b9185aa..f2285d6 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestUtil.scala @@ -16,17 +16,23 @@ */ package org.apache.spark.sql.carbondata.datasource -import java.io.File +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File, InputStream} import scala.collection.JavaConverters._ +import org.apache.avro +import org.apache.avro.file.DataFileWriter +import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord} +import org.apache.avro.io.{DecoderFactory, Encoder} import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.util.sideBySide +import org.junit.Assert import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.sdk.file.CarbonWriter object TestUtil { @@ -134,4 +140,52 @@ object TestUtil { } } + def WriteFilesWithAvroWriter(writerPath: String, + rows: Int, + mySchema: String, + json: String) = { + // conversion to GenericData.Record + val nn = new avro.Schema.Parser().parse(mySchema) + val record = jsonToAvro(json, mySchema) + try { + val writer = CarbonWriter.builder + .outputPath(writerPath) + .uniqueIdentifier(System.currentTimeMillis()).withAvroInput(nn).build() + var i = 0 + while (i < rows) { + writer.write(record) + i = i + 1 + } + writer.close() + } + catch { + case e: Exception => { + e.printStackTrace() + Assert.fail(e.getMessage) + } + } + } + + private def jsonToAvro(json: String, avroSchema: String): GenericRecord = { + var input: InputStream = null + var writer: DataFileWriter[GenericRecord] = null + var encoder: Encoder = null + var output: ByteArrayOutputStream = null + try { + val schema = new org.apache.avro.Schema.Parser().parse(avroSchema) + val reader = new GenericDatumReader[GenericRecord](schema) + input = new ByteArrayInputStream(json.getBytes()) + output = new ByteArrayOutputStream() + val din = new DataInputStream(input) + writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]()) + writer.create(schema, output) + val decoder = DecoderFactory.get().jsonDecoder(schema, din) + var datum: GenericRecord = reader.read(null, decoder) + return datum + } finally { + input.close() + writer.close() + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/682160fa/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index ab1e154..d19a96d 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -625,7 +625,8 @@ public class AvroCarbonWriter extends CarbonWriter { case ARRAY: // recursively get the sub fields // array will have only one sub field. - DataType subType = getMappingDataTypeForCollectionRecord(childSchema.getElementType()); + DataType subType = + getMappingDataTypeForCollectionRecord(fieldName, childSchema.getElementType()); if (subType != null) { return (new StructField(fieldName, DataTypes.createArrayType(subType))); } else { @@ -661,7 +662,8 @@ public class AvroCarbonWriter extends CarbonWriter { } } - private static DataType getMappingDataTypeForCollectionRecord(Schema childSchema) { + private static DataType getMappingDataTypeForCollectionRecord(String fieldName, + Schema childSchema) { LogicalType logicalType = childSchema.getLogicalType(); switch (childSchema.getType()) { case BOOLEAN: @@ -700,7 +702,7 @@ public class AvroCarbonWriter extends CarbonWriter { return DataTypes.FLOAT; case MAP: // recursively get the sub fields - StructField mapField = prepareSubFields("val", childSchema); + StructField mapField = prepareSubFields(fieldName, childSchema); if (mapField != null) { return mapField.getDataType(); } @@ -717,7 +719,8 @@ public class AvroCarbonWriter extends CarbonWriter { return DataTypes.createStructType(structSubFields); case ARRAY: // array will have only one sub field. - DataType subType = getMappingDataTypeForCollectionRecord(childSchema.getElementType()); + DataType subType = + getMappingDataTypeForCollectionRecord(fieldName, childSchema.getElementType()); if (subType != null) { return DataTypes.createArrayType(subType); } else {