http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala new file mode 100644 index 0000000..419b306 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala @@ -0,0 +1,279 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.carbondata.spark.testsuite.longstring + +import java.io.{File, PrintWriter} + +import org.apache.commons.lang3.RandomStringUtils +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + private val longStringTable = "long_string_table" + private val inputDir = s"$resourcesPath${File.separator}varchartype${File.separator}" + private val fileName = s"longStringData.csv" + private val inputFile = s"$inputDir$fileName" + private val fileName_2g_column_page = s"longStringData_exceed_2gb_column_page.csv" + private val inputFile_2g_column_page = s"$inputDir$fileName_2g_column_page" + private val lineNum = 1000 + private var content: Content = _ + private var originMemorySize = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT) + + case class Content(head: Int, desc_line_head: String, note_line_head: String, + mid: Int, desc_line_mid: String, note_line_mid: String, + tail: Int, desc_line_tail: String, note_line_tail: String) + + override def beforeAll(): Unit = { + // for one 32000 lines * 32000 characters column page, it use about 1GB memory, but here we have only 1000 lines + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT) + deleteFile(inputFile) + if (!new File(inputDir).exists()) { + new File(inputDir).mkdir() + } + content = createFile(inputFile, line = lineNum) + } + + override def afterAll(): Unit = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, originMemorySize) + deleteFile(inputFile) + deleteFile(inputFile_2g_column_page) + if (new File(inputDir).exists()) { + new File(inputDir).delete() + } + } + + override def beforeEach(): Unit = { + sql(s"drop table if exists $longStringTable") + } + + override def afterEach(): Unit = { + sql(s"drop table if exists $longStringTable") + } + + private def prepareTable(): Unit = { + sql( + s""" + | CREATE TABLE if not exists $longStringTable( + | id INT, name STRING, description STRING, address STRING, note STRING + | ) STORED BY 'carbondata' + | TBLPROPERTIES('LONG_STRING_COLUMNS'='description, note', 'SORT_COLUMNS'='name') + |""".stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $longStringTable + | OPTIONS('header'='false') + """.stripMargin) + } + + private def checkQuery(): Unit = { + // query without long_string_column + checkAnswer(sql(s"SELECT id, name, address FROM $longStringTable where id = ${content.tail}"), + Row(content.tail, s"name_${content.tail}", s"address_${content.tail}")) + // query return long_string_column in the middle position + checkAnswer(sql(s"SELECT id, name, description, address FROM $longStringTable where id = ${content.head}"), + Row(content.head, s"name_${content.head}", content.desc_line_head, s"address_${content.head}")) + // query return long_string_column at last position + checkAnswer(sql(s"SELECT id, name, address, description FROM $longStringTable where id = ${content.mid}"), + Row(content.mid, s"name_${content.mid}", s"address_${content.mid}", content.desc_line_mid)) + // query return 2 long_string_columns + checkAnswer(sql(s"SELECT id, name, note, address, description FROM $longStringTable where id = ${content.mid}"), + Row(content.mid, s"name_${content.mid}", content.note_line_mid, s"address_${content.mid}", content.desc_line_mid)) + // query by simple string column + checkAnswer(sql(s"SELECT id, note, address, description FROM $longStringTable where name = 'name_${content.tail}'"), + Row(content.tail, content.note_line_tail, s"address_${content.tail}", content.desc_line_tail)) + // query by long string column + checkAnswer(sql(s"SELECT id, name, address, description FROM $longStringTable where note = '${content.note_line_tail}'"), + Row(content.tail, s"name_${content.tail}", s"address_${content.tail}", content.desc_line_tail)) + } + + test("Load and query with long string datatype: safe sort & safe columnpage") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false") + + prepareTable() + checkQuery() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT) + } + + test("Load and query with long string datatype: safe sort & unsafe column page") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") + + prepareTable() + checkQuery() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT) + } + + test("Load and query with long string datatype: unsafe sort & safe column page") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false") + + prepareTable() + checkQuery() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT) + } + + test("Load and query with long string datatype: unsafe sort & unsafe column page") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true") + + prepareTable() + checkQuery() + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, + CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT) + } + + // ignore this test in CI, because it will need at least 4GB memory to run successfully + ignore("Exceed 2GB per column page for varchar datatype") { + deleteFile(inputFile_2g_column_page) + if (!new File(inputDir).exists()) { + new File(inputDir).mkdir() + } + // 7000000 characters with 3200 rows will exceed 2GB constraint for one column page. + content = createFile2(inputFile_2g_column_page, line = 3200, varcharLen = 700000) + + sql( + s""" + | CREATE TABLE if not exists $longStringTable( + | id INT, name STRING, description STRING, address STRING + | ) STORED BY 'carbondata' + | TBLPROPERTIES('LONG_STRING_COLUMNS'='description', 'SORT_COLUMNS'='name') + |""".stripMargin) + val exceptionCaught = intercept[Exception] { + sql( + s""" + | LOAD DATA LOCAL INPATH '$inputFile_2g_column_page' INTO TABLE $longStringTable + | OPTIONS('header'='false') + """.stripMargin) + } + // since after exception wrapper, we cannot get the root cause directly + } + + // will create 2 long string columns + private def createFile(filePath: String, line: Int = 10000, start: Int = 0, + varcharLen: Int = Short.MaxValue + 1000): Content = { + val head = 0 + val mid = line / 2 + var tail = line - 1 + var desc_line_head: String = "" + var desc_line_mid: String = "" + var desc_line_tail: String = "" + var note_line_head: String = "" + var note_line_mid: String = "" + var note_line_tail: String = "" + if (new File(filePath).exists()) { + deleteFile(filePath) + } + val write = new PrintWriter(new File(filePath)) + for (i <- start until (start + line)) { + val description = RandomStringUtils.randomAlphabetic(varcharLen) + val note = RandomStringUtils.randomAlphabetic(varcharLen) + val line = s"$i,name_$i,$description,address_$i,$note" + if (head == i) { + desc_line_head = description + note_line_head = note + } else if (mid == i) { + desc_line_mid = description + note_line_mid = note + } else if (tail == i) { + desc_line_tail = description + note_line_tail = note + } + write.println(line) + } + write.close() + Content(head, desc_line_head, note_line_head, + mid, desc_line_mid, note_line_mid, tail, + desc_line_tail, note_line_tail) + } + + // will only create 1 long string column + private def createFile2(filePath: String, line: Int = 10000, start: Int = 0, + varcharLen: Int = Short.MaxValue + 1000): Content = { + val head = 0 + val mid = line / 2 + var tail = line - 1 + var desc_line_head: String = "" + var desc_line_mid: String = "" + var desc_line_tail: String = "" + if (new File(filePath).exists()) { + deleteFile(filePath) + } + val write = new PrintWriter(new File(filePath)) + for (i <- start until (start + line)) { + val description = RandomStringUtils.randomAlphabetic(varcharLen) + val note = RandomStringUtils.randomAlphabetic(varcharLen) + val line = s"$i,name_$i,$description,address_$i" + if (head == i) { + desc_line_head = description + } else if (mid == i) { + desc_line_mid = description + } else if (tail == i) { + desc_line_tail = description + } + write.println(line) + } + write.close() + Content(head, desc_line_head, "", + mid, desc_line_mid, "", tail, + desc_line_tail, "") + } + + private def deleteFile(filePath: String): Unit = { + val file = new File(filePath) + if (file.exists()) { + file.delete() + } + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 1ccbf6a..6227655 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -115,6 +115,7 @@ object CarbonScalaUtil { case CarbonDataTypes.BOOLEAN => BooleanType case CarbonDataTypes.TIMESTAMP => TimestampType case CarbonDataTypes.DATE => DateType + case CarbonDataTypes.VARCHAR => StringType } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala index 6673e18..6cd28c0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala @@ -126,6 +126,7 @@ object DataTypeConverterUtil { case "timestamp" => ThriftDataType.TIMESTAMP case "array" => ThriftDataType.ARRAY case "struct" => ThriftDataType.STRUCT + case "varchar" => ThriftDataType.VARCHAR case _ => ThriftDataType.STRING } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 9af8817..0d53a73 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -280,7 +280,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { fields.zipWithIndex.foreach { case (field, index) => field.schemaOrdinal = index } - val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields( + val (dims, msrs, noDictionaryDims, sortKeyDims, varcharColumns) = extractDimAndMsrFields( fields, tableProperties) // column properties @@ -391,6 +391,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))), msrs.map(f => normalizeType(f)), Option(sortKeyDims), + Option(varcharColumns), Option(noDictionaryDims), Option(noInvertedIdxCols), groupCols, @@ -691,12 +692,31 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { * @return */ protected def extractDimAndMsrFields(fields: Seq[Field], - tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = { + tableProperties: Map[String, String]): + (Seq[Field], Seq[Field], Seq[String], Seq[String], Seq[String]) = { var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]() var msrFields: Seq[Field] = Seq[Field]() var dictExcludeCols: Array[String] = Array[String]() var noDictionaryDims: Seq[String] = Seq[String]() var dictIncludeCols: Seq[String] = Seq[String]() + var varcharCols: Seq[String] = Seq[String]() + + // All long_string cols should be there in create table cols and should be of string data type + if (tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS).isDefined) { + varcharCols = + tableProperties(CarbonCommonConstants.LONG_STRING_COLUMNS).split(",").map(_.trim) + varcharCols.foreach { varcharCol => + val exists = fields.exists(f => f.column.equalsIgnoreCase(varcharCol) && + DataTypes.STRING.getName.equalsIgnoreCase(f.dataType.get)) + if (!exists) { + throw new MalformedCarbonCommandException( + s""" + |${CarbonCommonConstants.LONG_STRING_COLUMNS}: $varcharCol does not exist in table + | or its data type is not string. Please check create table statement. + """.stripMargin) + } + } + } // All columns in sortkey should be there in create table cols val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS) @@ -727,6 +747,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { val errormsg = s"sort_columns is unsupported for $dataType datatype column: " + column throw new MalformedCarbonCommandException(errormsg) } + if (varcharCols.exists(x => x.equalsIgnoreCase(column))) { + throw new MalformedCarbonCommandException( + s"sort_columns is unsupported for long string datatype column $column") + } } } @@ -824,9 +848,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { var sortKeyDims = sortKeyDimsTmp if (sortKeyOption.isEmpty) { - // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS. + // if SORT_COLUMNS was not defined, + // add all dimension(except long string columns) to SORT_COLUMNS. dimFields.foreach { field => - if (!isComplexDimDictionaryExclude(field.dataType.get)) { + if (!isComplexDimDictionaryExclude(field.dataType.get) && + !varcharCols.contains(field.column)) { sortKeyDims :+= field.column } } @@ -837,7 +863,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { } else { tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(",")) } - (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims) + (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims, varcharCols) } def isDefaultMeasure(dataType: Option[String]): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index d48db21..c77d0df 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -54,6 +54,7 @@ case class TableModel( dimCols: Seq[Field], msrCols: Seq[Field], sortKeyDims: Option[Seq[String]], + varcharCols: Option[Seq[String]], highcardinalitydims: Option[Seq[String]], noInvertedIdxCols: Option[Seq[String]], columnGroups: Seq[String], @@ -212,9 +213,9 @@ class AlterTableColumnSchemaGenerator( tableIdentifier: AbsoluteTableIdentifier, sc: SparkContext) { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - def isSortColumn(columnName: String): Boolean = { + private def isSortColumn(columnName: String): Boolean = { val sortColumns = alterTableModel.tableProperties.get("sort_columns") if(sortColumns.isDefined) { sortColumns.get.contains(columnName) @@ -222,6 +223,16 @@ class AlterTableColumnSchemaGenerator( true } } + + private def isVarcharColumn(columnName: String): Boolean = { + val varcharColumns = alterTableModel.tableProperties.get("long_string_columns") + if (varcharColumns.isDefined) { + varcharColumns.get.contains(columnName) + } else { + false + } + } + def process: Seq[ColumnSchema] = { val tableSchema = tableInfo.getFactTable val tableCols = tableSchema.getListOfColumns.asScala @@ -241,7 +252,8 @@ class AlterTableColumnSchemaGenerator( field.schemaOrdinal + existingColsSize, alterTableModel.highCardinalityDims, alterTableModel.databaseName.getOrElse(dbName), - isSortColumn(field.name.getOrElse(field.column))) + isSortColumn(field.name.getOrElse(field.column)), + isVarcharColumn(field.name.getOrElse(field.column))) allColumns ++= Seq(columnSchema) newCols ++= Seq(columnSchema) }) @@ -351,14 +363,19 @@ object TableNewProcessor { schemaOrdinal: Int, highCardinalityDims: Seq[String], databaseName: String, - isSortColumn: Boolean = false): ColumnSchema = { + isSortColumn: Boolean = false, + isVarcharColumn: Boolean = false): ColumnSchema = { val dataType = DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")) if (DataTypes.isDecimal(dataType)) { dataType.asInstanceOf[DecimalType].setPrecision(field.precision) dataType.asInstanceOf[DecimalType].setScale(field.scale) } val columnSchema = new ColumnSchema() - columnSchema.setDataType(dataType) + if (isVarcharColumn) { + columnSchema.setDataType(DataTypes.VARCHAR) + } else { + columnSchema.setDataType(dataType) + } val colName = field.name.getOrElse(field.column) columnSchema.setColumnName(colName) if (highCardinalityDims.contains(colName)) { @@ -415,6 +432,11 @@ class TableNewProcessor(cm: TableModel) { allColumns } + // varchar column is a string column that in long_string_columns + private def isVarcharColumn(colName : String): Boolean = { + cm.varcharCols.get.contains(colName) + } + def getColumnSchema( dataType: DataType, colName: String, @@ -450,6 +472,9 @@ class TableNewProcessor(cm: TableModel) { columnSchema.setScale(field.scale) columnSchema.setSchemaOrdinal(field.schemaOrdinal) columnSchema.setSortColumn(false) + if (isVarcharColumn(colName)) { + columnSchema.setDataType(DataTypes.VARCHAR) + } if(isParentColumnRelation) { val dataMapField = map.get.get(field).get columnSchema.setFunction(dataMapField.aggregateFunction) @@ -517,7 +542,7 @@ class TableNewProcessor(cm: TableModel) { val dictionaryIncludeCols = cm.tableProperties .getOrElse(CarbonCommonConstants.DICTIONARY_INCLUDE, "") - cm.dimCols.foreach { field => + def addDimensionCol(field: Field): Unit = { val sortField = cm.sortKeyDims.get.find(field.column equals _) if (sortField.isEmpty) { val encoders = if (getEncoderFromParent(field)) { @@ -549,6 +574,12 @@ class TableNewProcessor(cm: TableModel) { } } } + // dimensions that are not varchar + cm.dimCols.filter(field => !cm.varcharCols.get.contains(field.column)) + .foreach(addDimensionCol(_)) + // dimensions that are varchar + cm.dimCols.filter(field => cm.varcharCols.get.contains(field.column)) + .foreach(addDimensionCol(_)) // check whether the column is a local dictionary column and set in column schema if (null != cm.tableProperties) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index 5739d3e..2f2048d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -201,6 +201,7 @@ case class CarbonRelation( object CarbonMetastoreTypes extends RegexParsers { protected lazy val primitiveType: Parser[DataType] = "string" ^^^ StringType | + "varchar" ^^^ StringType | "float" ^^^ FloatType | "int" ^^^ IntegerType | "tinyint" ^^^ ShortType | http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java index 9cf7fe4..3018e49 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java @@ -73,8 +73,10 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter { .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat); if (dataType == DataTypes.STRING && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " - + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + throw new CarbonDataLoadingException(String.format( + "Dataload failed, String size cannot exceed %d bytes," + + " please consider long string data type", + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)); } row.update(value, index); } else { @@ -82,8 +84,10 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter { .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat); if (dataType == DataTypes.STRING && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) { - throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed " - + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes"); + throw new CarbonDataLoadingException(String.format( + "Dataload failed, String size cannot exceed %d bytes," + + " please consider long string data type", + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)); } row.update(value, index); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java index 2e3479c..86c71a6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java @@ -205,7 +205,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri parserSettings.setSkipEmptyLines( Boolean.valueOf(job.get(SKIP_EMPTY_LINE, CarbonCommonConstants.CARBON_SKIP_EMPTY_LINE_DEFAULT))); - parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT); + // todo: will verify whether there is a performance degrade using -1 here + // parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT); + parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_INFINITY); String maxColumns = job.get(MAX_COLUMNS, "" + DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING); parserSettings.setMaxColumns(Integer.parseInt(maxColumns)); parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java index 8d351cf..8bec099 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java @@ -54,12 +54,13 @@ public class IntermediateSortTempRow { /** * deserialize from bytes array to get the no sort fields * @param outDictNoSort stores the dict & no-sort fields - * @param outNoDictNoSort stores the no-dict & no-sort fields, including complex + * @param outNoDictNoSortAndVarcharDims stores the no-dict & no-sort fields, + * including complex and varchar fields * @param outMeasures stores the measure fields * @param dataTypes data type for the measure */ - public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort, - Object[] outMeasures, DataType[] dataTypes) { + public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSortAndVarcharDims, + Object[] outMeasures, DataType[] dataTypes, int varcharDimCnt) { ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures); // read dict_no_sort int dictNoSortCnt = outDictNoSort.length; @@ -68,12 +69,20 @@ public class IntermediateSortTempRow { } // read no_dict_no_sort (including complex) - int noDictNoSortCnt = outNoDictNoSort.length; + int noDictNoSortCnt = outNoDictNoSortAndVarcharDims.length - varcharDimCnt; for (int i = 0; i < noDictNoSortCnt; i++) { short len = rowBuffer.getShort(); byte[] bytes = new byte[len]; rowBuffer.get(bytes); - outNoDictNoSort[i] = bytes; + outNoDictNoSortAndVarcharDims[i] = bytes; + } + + // read varchar dims + for (int i = 0; i < varcharDimCnt; i++) { + int len = rowBuffer.getInt(); + byte[] bytes = new byte[len]; + rowBuffer.get(bytes); + outNoDictNoSortAndVarcharDims[i + noDictNoSortCnt] = bytes; } // read measure http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java index f31a2b9..bcf8a39 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java @@ -46,6 +46,7 @@ public class SortStepRowHandler implements Serializable { private int dictNoSortDimCnt = 0; private int noDictSortDimCnt = 0; private int noDictNoSortDimCnt = 0; + private int varcharDimCnt = 0; private int measureCnt; // indices for dict & sort dimension columns @@ -56,6 +57,7 @@ public class SortStepRowHandler implements Serializable { private int[] noDictSortDimIdx; // indices for no-dict & no-sort dimension columns, including complex columns private int[] noDictNoSortDimIdx; + private int[] varcharDimIdx; // indices for measure columns private int[] measureIdx; @@ -70,11 +72,13 @@ public class SortStepRowHandler implements Serializable { this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt(); this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt(); this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt(); + this.varcharDimCnt = tableFieldStat.getVarcharDimCnt(); this.measureCnt = tableFieldStat.getMeasureCnt(); this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx(); this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx(); this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx(); this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx(); + this.varcharDimIdx = tableFieldStat.getVarcharDimIdx(); this.measureIdx = tableFieldStat.getMeasureIdx(); this.dataTypes = tableFieldStat.getMeasureDataType(); } @@ -122,6 +126,10 @@ public class SortStepRowHandler implements Serializable { for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) { nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]]; } + // convert varchar dims + for (int idx = 0; idx < this.varcharDimCnt; idx++) { + nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]]; + } // convert measure data for (int idx = 0; idx < this.measureCnt; idx++) { @@ -146,13 +154,15 @@ public class SortStepRowHandler implements Serializable { int[] dictDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt]; byte[][] noDictArray - = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][]; + = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt][]; int[] dictNoSortDims = new int[this.dictNoSortDimCnt]; - byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][]; + byte[][] noDictNoSortAndVarcharDims + = new byte[this.noDictNoSortDimCnt + this.varcharDimCnt][]; Object[] measures = new Object[this.measureCnt]; - sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, measures, this.dataTypes); + sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortAndVarcharDims, measures, + this.dataTypes, this.varcharDimCnt); // dict dims System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims, @@ -163,8 +173,8 @@ public class SortStepRowHandler implements Serializable { // no dict dims, including complex System.arraycopy(sortTempRow.getNoDictSortDims(), 0, noDictArray, 0, this.noDictSortDimCnt); - System.arraycopy(noDictNoSortDims, 0, noDictArray, - this.noDictSortDimCnt, this.noDictNoSortDimCnt); + System.arraycopy(noDictNoSortAndVarcharDims, 0, noDictArray, + this.noDictSortDimCnt, this.noDictNoSortDimCnt + this.varcharDimCnt); // measures are already here @@ -428,6 +438,12 @@ public class SortStepRowHandler implements Serializable { rowBuffer.putShort((short) bytes.length); rowBuffer.put(bytes); } + // convert varchar dims + for (int idx = 0; idx < this.varcharDimCnt; idx++) { + byte[] bytes = (byte[]) row[this.varcharDimIdx[idx]]; + rowBuffer.putInt(bytes.length); + rowBuffer.put(bytes); + } // convert measure Object tmpValue; http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index dde18a9..9dab181 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -92,6 +92,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { */ private boolean[] noDictionaryColMapping; /** + * boolean mapping for long string dimension + */ + private boolean[] isVarcharDimMapping; + /** * agg type defined for measures */ private DataType[] dataTypes; @@ -353,13 +357,18 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { measureCount = carbonTable.getMeasureByTableName(tableName).size(); List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName); noDictionaryColMapping = new boolean[dimensions.size()]; + isVarcharDimMapping = new boolean[dimensions.size()]; int i = 0; + int j = 0; for (CarbonDimension dimension : dimensions) { if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) { i++; continue; } noDictionaryColMapping[i++] = true; + if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) { + isVarcharDimMapping[j++] = true; + } noDictionaryCount++; } dimensionColumnCount = dimensions.size(); @@ -387,7 +396,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName, dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount, segmentId, - carbonLoadModel.getTaskNo(), noDictionaryColMapping, true); + carbonLoadModel.getTaskNo(), noDictionaryColMapping, isVarcharDimMapping, true); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java index 705350c..502fa05 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java @@ -111,7 +111,11 @@ public class SortParameters implements Serializable { private boolean[] noDictionaryDimnesionColumn; private boolean[] noDictionarySortColumn; - + /** + * whether dimension is varchar data type. + * since all dimensions are string, we use an array of boolean instead of datatypes + */ + private boolean[] isVarcharDimensionColumn; private int numberOfSortColumns; private int numberOfNoDictSortColumns; @@ -143,6 +147,7 @@ public class SortParameters implements Serializable { parameters.segmentId = segmentId; parameters.taskNo = taskNo; parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn; + parameters.isVarcharDimensionColumn = isVarcharDimensionColumn; parameters.noDictionarySortColumn = noDictionarySortColumn; parameters.numberOfSortColumns = numberOfSortColumns; parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns; @@ -312,6 +317,14 @@ public class SortParameters implements Serializable { this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn; } + public boolean[] getIsVarcharDimensionColumn() { + return isVarcharDimensionColumn; + } + + public void setIsVarcharDimensionColumn(boolean[] isVarcharDimensionColumn) { + this.isVarcharDimensionColumn = isVarcharDimensionColumn; + } + public int getNumberOfCores() { return numberOfCores; } @@ -371,6 +384,8 @@ public class SortParameters implements Serializable { .getComplexNonDictionaryColumnCount()); parameters.setNoDictionaryDimnesionColumn( CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields())); + parameters.setIsVarcharDimensionColumn( + CarbonDataProcessorUtil.getIsVarcharColumnMapping(configuration.getDataFields())); parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration)); parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns()); @@ -461,7 +476,8 @@ public class SortParameters implements Serializable { public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName, String tableName, int dimColCount, int complexDimColCount, int measureColCount, int noDictionaryCount, String segmentId, String taskNo, - boolean[] noDictionaryColMaping, boolean isCompactionFlow) { + boolean[] noDictionaryColMaping, boolean[] isVarcharDimensionColumn, + boolean isCompactionFlow) { SortParameters parameters = new SortParameters(); CarbonProperties carbonProperties = CarbonProperties.getInstance(); parameters.setDatabaseName(databaseName); @@ -476,6 +492,7 @@ public class SortParameters implements Serializable { parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns()); parameters.setComplexDimColCount(complexDimColCount); parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping); + parameters.setIsVarcharDimensionColumn(isVarcharDimensionColumn); parameters.setObserver(new SortObserver()); // get sort buffer size parameters.setSortBufferSize(Integer.parseInt(carbonProperties http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java index 0d1303a..094bd83 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java @@ -33,9 +33,13 @@ public class TableFieldStat implements Serializable { private int dictSortDimCnt = 0; private int dictNoSortDimCnt = 0; private int noDictSortDimCnt = 0; + // for columns that are no_dict_dim and no_sort_dim and complex, except the varchar dims private int noDictNoSortDimCnt = 0; + // for columns that are varchar data type + private int varcharDimCnt = 0; // whether sort column is of dictionary type or not private boolean[] isSortColNoDictFlags; + private boolean[] isVarcharDimFlags; private int measureCnt; private DataType[] measureDataType; @@ -47,6 +51,8 @@ public class TableFieldStat implements Serializable { private int[] noDictSortDimIdx; // indices for no-dict & no-sort dimension columns, including complex columns private int[] noDictNoSortDimIdx; + // indices for varchar dimension columns + private int[] varcharDimIdx; // indices for measure columns private int[] measureIdx; @@ -55,6 +61,7 @@ public class TableFieldStat implements Serializable { int complexDimCnt = sortParameters.getComplexDimColCount(); int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt; this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn(); + this.isVarcharDimFlags = sortParameters.getIsVarcharDimensionColumn(); int sortColCnt = isSortColNoDictFlags.length; for (boolean flag : isSortColNoDictFlags) { if (flag) { @@ -66,22 +73,33 @@ public class TableFieldStat implements Serializable { this.measureCnt = sortParameters.getMeasureColCount(); this.measureDataType = sortParameters.getMeasureDataType(); + for (boolean flag : isVarcharDimFlags) { + if (flag) { + varcharDimCnt++; + } + } + // be careful that the default value is 0 this.dictSortDimIdx = new int[dictSortDimCnt]; this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt]; this.noDictSortDimIdx = new int[noDictSortDimCnt]; - this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt]; + this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt + - varcharDimCnt]; + this.varcharDimIdx = new int[varcharDimCnt]; this.measureIdx = new int[measureCnt]; int tmpNoDictSortCnt = 0; int tmpNoDictNoSortCnt = 0; int tmpDictSortCnt = 0; int tmpDictNoSortCnt = 0; + int tmpVarcharCnt = 0; boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn(); for (int i = 0; i < isDimNoDictFlags.length; i++) { if (isDimNoDictFlags[i]) { - if (i < sortColCnt && isSortColNoDictFlags[i]) { + if (isVarcharDimFlags[i]) { + varcharDimIdx[tmpVarcharCnt++] = i; + } else if (i < sortColCnt && isSortColNoDictFlags[i]) { noDictSortDimIdx[tmpNoDictSortCnt++] = i; } else { noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i; @@ -126,10 +144,18 @@ public class TableFieldStat implements Serializable { return noDictNoSortDimCnt; } + public int getVarcharDimCnt() { + return varcharDimCnt; + } + public boolean[] getIsSortColNoDictFlags() { return isSortColNoDictFlags; } + public boolean[] getIsVarcharDimFlags() { + return isVarcharDimFlags; + } + public int getMeasureCnt() { return measureCnt; } @@ -154,6 +180,10 @@ public class TableFieldStat implements Serializable { return noDictNoSortDimIdx; } + public int[] getVarcharDimIdx() { + return varcharDimIdx; + } + public int[] getMeasureIdx() { return measureIdx; } @@ -166,11 +196,12 @@ public class TableFieldStat implements Serializable { && dictNoSortDimCnt == that.dictNoSortDimCnt && noDictSortDimCnt == that.noDictSortDimCnt && noDictNoSortDimCnt == that.noDictNoSortDimCnt + && varcharDimCnt == that.varcharDimCnt && measureCnt == that.measureCnt; } @Override public int hashCode() { return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt, - noDictNoSortDimCnt, measureCnt); + noDictNoSortDimCnt, varcharDimCnt, measureCnt); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index 5408193..1a1c5d1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -39,7 +39,8 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory; import org.apache.carbondata.core.datastore.page.key.TablePageKey; import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector; -import org.apache.carbondata.core.datastore.page.statistics.LVStringStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.LVLongStringStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.LVShortStringStatsCollector; import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; @@ -98,8 +99,16 @@ public class TablePage { noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()]; for (int i = 0; i < noDictDimensionPages.length; i++) { TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i + numDictDimension); - ColumnPage page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize); - page.setStatsCollector(LVStringStatsCollector.newInstance()); + ColumnPage page; + if (DataTypes.VARCHAR == spec.getSchemaDataType()) { + page = ColumnPage.newPage(spec, DataTypes.VARCHAR, pageSize); + page.setStatsCollector(LVLongStringStatsCollector.newInstance()); + } else { + // In previous implementation, other data types such as string, date and timestamp + // will be encoded using string page + page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize); + page.setStatsCollector(LVShortStringStatsCollector.newInstance()); + } noDictDimensionPages[i] = page; } complexDimensionPages = new ComplexColumnPage[model.getComplexColumnCount()]; @@ -155,16 +164,21 @@ public class TablePage { dictDimensionPages[i].putData(rowId, keys[i]); } - // 2. convert noDictionary columns and complex columns. + // 2. convert noDictionary columns and complex columns and varchar columns. int noDictionaryCount = noDictDimensionPages.length; int complexColumnCount = complexDimensionPages.length; if (noDictionaryCount > 0 || complexColumnCount > 0) { + TableSpec tableSpec = model.getTableSpec(); byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row); for (int i = 0; i < noDictAndComplex.length; i++) { - if (i < noDictionaryCount) { + if (tableSpec.getDimensionSpec(dictDimensionPages.length + i).getSchemaDataType() + == DataTypes.VARCHAR) { + byte[] valueWithLength = addIntLengthToByteArray(noDictAndComplex[i]); + noDictDimensionPages[i].putData(rowId, valueWithLength); + } else if (i < noDictionaryCount) { // noDictionary columns, since it is variable length, we need to prepare each // element as LV result byte array (first two bytes are the length of the array) - byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]); + byte[] valueWithLength = addShortLengthToByteArray(noDictAndComplex[i]); noDictDimensionPages[i].putData(rowId, valueWithLength); } else { // complex columns @@ -250,7 +264,7 @@ public class TablePage { } // Adds length as a short element (first 2 bytes) to the head of the input byte array - private byte[] addLengthToByteArray(byte[] input) { + private byte[] addShortLengthToByteArray(byte[] input) { if (input.length > Short.MAX_VALUE) { throw new RuntimeException("input data length " + input.length + " bytes too long, maximum length supported is " + Short.MAX_VALUE + " bytes"); @@ -262,6 +276,15 @@ public class TablePage { return output; } + // Adds length as a integer element (first 4 bytes) to the head of the input byte array + private byte[] addIntLengthToByteArray(byte[] input) { + byte[] output = new byte[input.length + 4]; + ByteBuffer buffer = ByteBuffer.wrap(output); + buffer.putInt(input.length); + buffer.put(input, 0, input.length); + return output; + } + void encode() throws KeyGenException, MemoryException, IOException { // encode dimensions and measure EncodedColumnPage[] dimensions = encodeAndCompressDimensions(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index f921fd5..12c95a9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -223,6 +223,26 @@ public final class CarbonDataProcessorUtil { .toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()])); } + /** + * Preparing the boolean [] to map whether the dimension is varchar data type or not. + */ + public static boolean[] getIsVarcharColumnMapping(DataField[] fields) { + List<Boolean> isVarcharColumnMapping = new ArrayList<Boolean>(); + for (DataField field : fields) { + // for complex type need to break the loop + if (field.getColumn().isComplex()) { + break; + } + + if (field.getColumn().isDimension()) { + isVarcharColumnMapping.add( + field.getColumn().getColumnSchema().getDataType() == DataTypes.VARCHAR); + } + } + return ArrayUtils.toPrimitive( + isVarcharColumnMapping.toArray(new Boolean[isVarcharColumnMapping.size()])); + } + public static boolean[] getNoDictionaryMapping(CarbonColumn[] carbonColumns) { List<Boolean> noDictionaryMapping = new ArrayList<Boolean>(); for (CarbonColumn column : carbonColumns) {