[CARBONDATA-2471]Added support for No Dictionary Complex type for Double, Decimal, Date type in SDK
Added support for No Dictionary Complex type for Double, Decimal, Date type in SDK This is closes #2297 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d85fb72e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d85fb72e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d85fb72e Branch: refs/heads/spark-2.3 Commit: d85fb72e2f24107769c7b5ce7d454d52cbaee49d Parents: 3d8b085 Author: kumarvishal09 <kumarvishal1...@gmail.com> Authored: Thu May 10 22:52:09 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Fri May 11 15:38:27 2018 +0530 ---------------------------------------------------------------------- .../scan/complextypes/PrimitiveQueryType.java | 18 +- .../apache/carbondata/core/util/ByteUtil.java | 8 + .../carbondata/core/util/DataTypeUtil.java | 18 ++ ...ransactionalCarbonTableWithComplexType.scala | 232 +++++++++++++++++++ .../command/carbonTableSchemaCommon.scala | 9 +- .../processing/datatypes/PrimitiveDataType.java | 29 ++- 6 files changed, 297 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d85fb72e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java index 2db590b..edae4da 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java @@ -22,13 +22,16 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.apache.carbondata.core.keygenerator.mdkey.Bits; import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.filter.GenericQueryType; import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; +import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.DataTypeUtil; public class PrimitiveQueryType extends ComplexQueryType implements GenericQueryType { @@ -46,6 +49,8 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery private boolean isDictionary; + private DirectDictionaryGenerator directDictGenForDate; + public PrimitiveQueryType(String name, String parentname, int blockIndex, DataType dataType, int keySize, Dictionary dictionary, boolean isDirectDictionary) { @@ -57,6 +62,8 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery this.parentname = parentname; this.isDirectDictionary = isDirectDictionary; this.isDictionary = (dictionary != null && isDirectDictionary == false); + this.directDictGenForDate = + DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(DataTypes.DATE); } @Override public void addChildren(GenericQueryType children) { @@ -116,7 +123,16 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery int size = dataBuffer.getInt(); byte[] value = new byte[size]; dataBuffer.get(value, 0, size); - actualData = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(value, this.dataType); + if (dataType == DataTypes.DATE) { + if (value.length == 0) { + actualData = null; + } else { + actualData = this.directDictGenForDate.getValueFromSurrogate( + ByteUtil.toInt(value, 0, CarbonCommonConstants.INT_SIZE_IN_BYTE)); + } + } else { + actualData = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(value, this.dataType); + } } else { // Dictionary Column byte[] data = new byte[keySize]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/d85fb72e/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java index 52fc3c3..661384c 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java @@ -545,6 +545,14 @@ public final class ByteUtil { return b; } + public static byte[] toBytes(double val) { + return toBytes(Double.doubleToLongBits(val)); + } + + public static double toDouble(byte[] value, int offset, int length) { + return Double.longBitsToDouble(toLong(value, offset, length)); + } + /** * byte[] => long */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/d85fb72e/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 6967102..9822167 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -331,6 +331,10 @@ public final class DataTypeUtil { return ByteUtil.toBytes(Integer.parseInt(dimensionValue)); } else if (actualDataType == DataTypes.LONG) { return ByteUtil.toBytes(Long.parseLong(dimensionValue)); + } else if (actualDataType == DataTypes.DOUBLE) { + return ByteUtil.toBytes(Double.parseDouble(dimensionValue)); + } else if (DataTypes.isDecimal(actualDataType)) { + return bigDecimalToByte(new BigDecimal(dimensionValue)); } else if (actualDataType == DataTypes.TIMESTAMP) { Date dateToStr = null; DateFormat dateFormatter = null; @@ -362,6 +366,10 @@ public final class DataTypeUtil { return Integer.parseInt(dimensionValue); } else if (actualDataType == DataTypes.LONG) { return Long.parseLong(dimensionValue); + } else if (actualDataType == DataTypes.DOUBLE) { + return Double.parseDouble(dimensionValue); + } else if (DataTypes.isDecimal(actualDataType)) { + return new BigDecimal(dimensionValue); } else if (actualDataType == DataTypes.TIMESTAMP) { Date dateToStr = null; DateFormat dateFormatter = null; @@ -449,6 +457,16 @@ public final class DataTypeUtil { return null; } return ByteUtil.toLong(dataInBytes, 0, dataInBytes.length) * 1000L; + } else if (actualDataType == DataTypes.DOUBLE) { + if (isEmptyByteArray(dataInBytes)) { + return null; + } + return ByteUtil.toDouble(dataInBytes, 0, dataInBytes.length); + } else if (DataTypes.isDecimal(actualDataType)) { + if (isEmptyByteArray(dataInBytes)) { + return null; + } + return converter.convertFromBigDecimalToDecimal(byteToBigDecimal(dataInBytes)); } else { return ByteUtil.toString(dataInBytes, 0, dataInBytes.length); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d85fb72e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala new file mode 100644 index 0000000..ccfb231 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.{File} +import java.util + + +import org.apache.avro +import org.apache.commons.io.FileUtils +import org.apache.commons.lang.CharEncoding +import org.apache.spark.sql.test.util.QueryTest +import org.junit.Assert +import org.scalatest.BeforeAndAfterAll +import tech.allegro.schema.json2avro.converter.JsonAvroConverter +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} + +class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/") + + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + override def beforeAll(): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + private def WriteFilesWithAvroWriter(rows: Int, + mySchema: String, + json: String, + fields: Array[Field]) = { + // conversion to GenericData.Record + val nn = new avro.Schema.Parser().parse(mySchema) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn) + + try { + val writer = CarbonWriter.builder.withSchema(new Schema(fields)) + .outputPath(writerPath).isTransactionalTable(false) + .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput + var i = 0 + while (i < rows) { + writer.write(record) + i = i + 1 + } + writer.close() + } + catch { + case e: Exception => { + e.printStackTrace() + Assert.fail(e.getMessage) + } + } + } + + // test multi level -- 4 levels [array of array of array of struct] + def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String]): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + + val mySchema = """ { + | "name": "address", + | "type": "record", + | "fields": [ + | { + | "name": "name", + | "type": "string" + | }, + | { + | "name": "age", + | "type": "int" + | }, + | { + | "name": "BuildNum", + | "type": { + | "type": "array", + | "items": { + | "name": "FloorNum", + | "type": "array", + | "items": { + | "name": "doorNum", + | "type": "array", + | "items": { + | "name": "my_address", + | "type": "record", + | "fields": [ + | { + | "name": "street", + | "type": "string" + | }, + | { + | "name": "city", + | "type": "string" + | }, + | { + | "name": "Temperature", + | "type": "double" + | }, + | { + | "name": "WindSpeed", + | "type": "string" + | }, + | { + | "name": "year", + | "type": "string" + | } + | ] + | } + | } + | } + | } + | } + | ] + |} """.stripMargin + + val json = + """ { + | "name": "bob", + | "age": 10, + | "BuildNum": [ + | [ + | [ + | {"street":"abc", "city":"city1", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}, + | {"street":"def", "city":"city2", "Temperature":13.6, "WindSpeed":"1234.56", "year":"2018-05-10"}, + | {"street":"cfg", "city":"city3", "Temperature":14.6, "WindSpeed":"1234.56", "year":"2018-05-10"} + | ], + | [ + | {"street":"abc1", "city":"city3", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}, + | {"street":"def1", "city":"city4", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}, + | {"street":"cfg1", "city":"city5", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"} + | ] + | ], + | [ + | [ + | {"street":"abc2", "city":"cityx", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}, + | {"street":"abc3", "city":"cityy", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}, + | {"street":"abc4", "city":"cityz", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"} + | ], + | [ + | {"street":"a1bc", "city":"cityA", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}, + | {"street":"a1bc", "city":"cityB", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"}, + | {"street":"a1bc", "city":"cityc", "Temperature":12.6, "WindSpeed":"1234.56", "year":"2018-05-10"} + | ] + | ] + | ] + |} """.stripMargin + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val subFld = new util.ArrayList[StructField] + subFld.add(new StructField("EachDoorNum", DataTypes.INT)) + + val address = new util.ArrayList[StructField] + address.add(new StructField("street", DataTypes.STRING)) + address.add(new StructField("city", DataTypes.STRING)) + address.add(new StructField("Temperature", DataTypes.DOUBLE)) + address.add(new StructField("WindSpeed", DataTypes.createDecimalType(6,2))) + address.add(new StructField("year", DataTypes.DATE)) + + val fld = new util.ArrayList[StructField] + fld.add(new StructField("DoorNum", + DataTypes.createArrayType(DataTypes.createStructType(address)), + subFld)) + // array of struct of struct + val doorNum = new util.ArrayList[StructField] + doorNum.add(new StructField("FloorNum", + DataTypes.createArrayType( + DataTypes.createArrayType(DataTypes.createStructType(address))), fld)) + fields(2) = new Field("BuildNum", "array", doorNum) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) + } + + def buildAvroTestDataMultiLevel4Type(): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + buildAvroTestDataMultiLevel4(3, null) + } + + // test multi level -- 4 levels [array of array of array of struct] + test("test multi level support : array of array of array of struct") { + buildAvroTestDataMultiLevel4Type() + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + sql("select * from sdkOutputTable").show(false) + + // TODO: Add a validation + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + cleanTestData() + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d85fb72e/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 8a12970..a830185 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -393,13 +393,6 @@ class TableNewProcessor(cm: TableModel) { var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]() fieldChildren.foreach(fields => { fields.foreach(field => { - if (!useDictionaryEncoding && - (field.dataType.get.equalsIgnoreCase("double") || - field.dataType.get.equalsIgnoreCase("date") || - field.dataType.get.equalsIgnoreCase("decimal"))) { - throw new MalformedCarbonCommandException(s"DICTIONARY_EXCLUDE is unsupported for ${ - field.dataType.get} data type column: ${ field.column }") - } val encoders = new java.util.ArrayList[Encoding]() if (useDictionaryEncoding) { encoders.add(Encoding.DICTIONARY) @@ -439,7 +432,7 @@ class TableNewProcessor(cm: TableModel) { if (highCardinalityDims.contains(colName)) { encoders.remove(Encoding.DICTIONARY) } - if (dataType == DataTypes.DATE) { + if (dataType == DataTypes.DATE && useDictionaryEncoding) { encoders.add(Encoding.DIRECT_DICTIONARY) } if (dataType == DataTypes.TIMESTAMP && http://git-wip-us.apache.org/repos/asf/carbondata/blob/d85fb72e/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index fdfc3bb..481c811 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -46,6 +46,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; @@ -108,6 +109,7 @@ public class PrimitiveDataType implements GenericDataType<Object> { private String nullformat; + private boolean isDirectDictionary; private PrimitiveDataType(int outputArrayIndex, int dataCounter) { this.outputArrayIndex = outputArrayIndex; @@ -162,9 +164,11 @@ public class PrimitiveDataType implements GenericDataType<Object> { new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, carbonDimension.getColumnIdentifier(), carbonDimension.getDataType()); try { - if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) + || carbonColumn.getDataType() == DataTypes.DATE) { dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory .getDirectDictionaryGenerator(carbonDimension.getDataType())); + isDirectDictionary = true; } else if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) { CacheProvider cacheProvider = CacheProvider.getInstance(); Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache = @@ -307,15 +311,24 @@ public class PrimitiveDataType implements GenericDataType<Object> { } else if (this.carbonDimension.getDataType() == DataTypes.TIMESTAMP) { dateFormat = carbonDimension.getTimestampFormat(); } - try { if (!this.carbonDimension.getUseActualData()) { - byte[] value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue, - this.carbonDimension.getDataType(), dateFormat); - if (this.carbonDimension.getDataType() == DataTypes.STRING - && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " - + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + byte[] value = null; + if (isDirectDictionary) { + int surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue); + if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) { + value = new byte[0]; + } else { + value = ByteUtil.toBytes(surrogateKey); + } + } else { + value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue, + this.carbonDimension.getDataType(), dateFormat); + if (this.carbonDimension.getDataType() == DataTypes.STRING + && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { + throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " + + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + } } updateValueToByteStream(dataOutputStream, value); } else {