Repository: incubator-carbondata Updated Branches: refs/heads/12-dev 3b62d25cf -> 5763f8c60
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 3346743..30e03ba 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -52,6 +52,7 @@ case class TableModel( tableProperties: Map[String, String], dimCols: Seq[Field], msrCols: Seq[Field], + sortKeyDims: Option[Seq[String]], highcardinalitydims: Option[Seq[String]], noInvertedIdxCols: Option[Seq[String]], columnGroups: Seq[String], @@ -357,6 +358,7 @@ class TableNewProcessor(cm: TableModel) { columnSchema.setPrecision(precision) columnSchema.setScale(scale) columnSchema.setSchemaOrdinal(schemaOrdinal) + columnSchema.setSortColumn(false) // TODO: Need to fill RowGroupID, converted type // & Number of Children after DDL finalization columnSchema @@ -367,7 +369,11 @@ class TableNewProcessor(cm: TableModel) { val LOGGER = LogServiceFactory.getLogService(TableNewProcessor.getClass.getName) var allColumns = Seq[ColumnSchema]() var index = 0 - cm.dimCols.foreach(field => { + var measureCount = 0 + + // Sort columns should be at the begin of all columns + cm.sortKeyDims.get.foreach { keyDim => + val field = cm.dimCols.find(keyDim equals _.column).get val encoders = new java.util.ArrayList[Encoding]() encoders.add(Encoding.DICTIONARY) val columnSchema: ColumnSchema = getColumnSchema( @@ -381,11 +387,33 @@ class TableNewProcessor(cm: TableModel) { field.precision, field.scale, field.schemaOrdinal) - allColumns ++= Seq(columnSchema) + columnSchema.setSortColumn(true) + allColumns :+= columnSchema index = index + 1 - if (field.children.isDefined && field.children.get != null) { - columnSchema.setNumberOfChild(field.children.get.size) - allColumns ++= getAllChildren(field.children) + } + + cm.dimCols.foreach(field => { + val sortField = cm.sortKeyDims.get.find(field.column equals _) + if (sortField.isEmpty) { + val encoders = new java.util.ArrayList[Encoding]() + encoders.add(Encoding.DICTIONARY) + val columnSchema: ColumnSchema = getColumnSchema( + DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")), + field.name.getOrElse(field.column), + index, + isCol = true, + encoders, + isDimensionCol = true, + -1, + field.precision, + field.scale, + field.schemaOrdinal) + allColumns :+= columnSchema + index = index + 1 + if (field.children.isDefined && field.children.get != null) { + columnSchema.setNumberOfChild(field.children.get.size) + allColumns ++= getAllChildren(field.children) + } } }) @@ -402,10 +430,9 @@ class TableNewProcessor(cm: TableModel) { field.precision, field.scale, field.schemaOrdinal) - val measureCol = columnSchema - - allColumns ++= Seq(measureCol) + allColumns :+= columnSchema index = index + 1 + measureCount += 1 }) // Check if there is any duplicate measures or dimensions. @@ -426,22 +453,6 @@ class TableNewProcessor(cm: TableModel) { updateColumnGroupsInFields(cm.columnGroups, allColumns) - var newOrderedDims = scala.collection.mutable.ListBuffer[ColumnSchema]() - val complexDims = scala.collection.mutable.ListBuffer[ColumnSchema]() - val measures = scala.collection.mutable.ListBuffer[ColumnSchema]() - for (column <- allColumns) { - if (highCardinalityDims.contains(column.getColumnName)) { - newOrderedDims += column - } else if (column.isComplex) { - complexDims += column - } else if (column.isDimensionColumn) { - newOrderedDims += column - } else { - measures += column - } - - } - // Setting the boolean value of useInvertedIndex in column schema val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq()) for (column <- allColumns) { @@ -456,7 +467,7 @@ class TableNewProcessor(cm: TableModel) { } // Adding dummy measure if no measure is provided - if (measures.size < 1) { + if (measureCount == 0) { val encoders = new java.util.ArrayList[Encoding]() val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE, CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE, @@ -466,13 +477,10 @@ class TableNewProcessor(cm: TableModel) { false, -1, 0, 0, schemaOrdinal = -1) columnSchema.setInvisible(true) - val measureColumn = columnSchema - measures += measureColumn - allColumns = allColumns ++ measures + allColumns :+= columnSchema } val columnValidator = CarbonSparkFactory.getCarbonColumnValidator() columnValidator.validateColumns(allColumns) - newOrderedDims = newOrderedDims ++ complexDims ++ measures val tableInfo = new TableInfo() val tableSchema = new TableSchema() http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala index 8588868..b8f0a7c 100644 --- a/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala +++ b/integration/spark/src/test/scala/org/apache/spark/sql/TestCarbonSqlParser.scala @@ -32,23 +32,15 @@ private class TestCarbonSqlParserStub extends CarbonSqlParser { def updateColumnGroupsInFieldTest(fields: Seq[Field], tableProperties: Map[String, String]): Seq[String] = { - var (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields( - fields, tableProperties) - val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties) + var (dims, msrs, noDictionaryDims, sortkey) = extractDimAndMsrFields(fields, tableProperties) updateColumnGroupsInField(tableProperties, noDictionaryDims, msrs, dims) } - def extractDimColsAndNoDictionaryFieldsTest(fields: Seq[Field], tableProperties: Map[String, String]): (Seq[Field], - Seq[String]) = { - - extractDimColsAndNoDictionaryFields(fields, tableProperties) - } - - def extractMsrColsFromFieldsTest(fields: Seq[Field], tableProperties: Map[String, String]): (Seq[Field]) = { - - extractMsrColsFromFields(fields, tableProperties) + def extractDimAndMsrFieldsTest(fields: Seq[Field], + tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = { + extractDimAndMsrFields(fields, tableProperties) } @@ -199,7 +191,7 @@ class TestCarbonSqlParser extends QueryTest { val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub.extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) + val (dimCols, _, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) // testing col @@ -219,9 +211,7 @@ class TestCarbonSqlParser extends QueryTest { val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1") val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub - .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) //below fields should be available in dimensions list assert(dimCols.size == 7) @@ -242,9 +232,7 @@ class TestCarbonSqlParser extends QueryTest { val tableProperties = Map(CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1") val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub - .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) //below dimension fields should be available in dimensions list assert(dimCols.size == 7) @@ -264,9 +252,8 @@ class TestCarbonSqlParser extends QueryTest { val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col4") val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub - .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, + tableProperties) //below dimension fields should be available in dimensions list assert(dimCols.size == 8) @@ -287,9 +274,7 @@ class TestCarbonSqlParser extends QueryTest { val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col3", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2") val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub - .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) //below dimension fields should be available in dimensions list assert(dimCols.size == 7) @@ -310,9 +295,7 @@ class TestCarbonSqlParser extends QueryTest { val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col1", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col2") val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub - .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) //below dimension fields should be available in dimensions list assert(dimCols.size == 7) @@ -333,9 +316,7 @@ class TestCarbonSqlParser extends QueryTest { val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1") val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub - .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) //below dimension fields should be available in dimensions list assert(dimCols.size == 7) @@ -358,9 +339,7 @@ class TestCarbonSqlParser extends QueryTest { ) val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub - .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) //below dimension fields should be available in dimensions list assert(dimCols.size == 8) @@ -382,9 +361,7 @@ class TestCarbonSqlParser extends QueryTest { val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE-> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col3") val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val (dimCols, noDictionary) = stub - .extractDimColsAndNoDictionaryFieldsTest(fields, tableProperties) - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (dimCols, msrCols, noDictionary, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) //below dimension fields should be available in dimensions list assert(dimCols.size == 7) @@ -402,10 +379,11 @@ class TestCarbonSqlParser extends QueryTest { // Testing the extracting of measures test("Test-extractMsrColsFromFields") { - val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1") + val tableProperties = Map(CarbonCommonConstants.DICTIONARY_EXCLUDE -> "col2", + CarbonCommonConstants.DICTIONARY_INCLUDE -> "col1") val fields: Seq[Field] = loadAllFields val stub = new TestCarbonSqlParserStub() - val msrCols = stub.extractMsrColsFromFieldsTest(fields, tableProperties) + val (_, msrCols, _, _) = stub.extractDimAndMsrFieldsTest(fields, tableProperties) // testing col assert(msrCols.lift(0).get.column.equalsIgnoreCase("col4")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java index b848543..b4b462a 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java @@ -28,18 +28,25 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<InternalRow> { + boolean[] isMeasure; + @Override public void initialize(CarbonColumn[] carbonColumns, AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException { - super.initialize(carbonColumns, absoluteTableIdentifier); //can initialize and generate schema here. + isMeasure = new boolean[carbonColumns.length]; + dataTypes = new DataType[carbonColumns.length]; + for (int i = 0; i < carbonColumns.length; i++) { + isMeasure[i] = !carbonColumns[i].isDimesion(); + dataTypes[i] = carbonColumns[i].getDataType(); + } } @Override public InternalRow readRow(Object[] data) { - for (int i = 0; i < dictionaries.length; i++) { + for (int i = 0; i < isMeasure.length; i++) { if (data[i] == null) { continue; } - if (dictionaries[i] == null) { + if (isMeasure[i]) { if (dataTypes[i].equals(DataType.INT)) { data[i] = ((Long)(data[i])).intValue(); } else if (dataTypes[i].equals(DataType.SHORT)) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java index 2a9c701..27bbc2a 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.vectorreader; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; class ColumnarVectorWrapper implements CarbonColumnVector { @@ -30,6 +31,14 @@ class ColumnarVectorWrapper implements CarbonColumnVector { this.columnVector = columnVector; } + @Override public void putBoolean(int rowId, boolean value) { + columnVector.putBoolean(rowId, value); + } + + @Override public void putFloat(int rowId, float value) { + columnVector.putFloat(rowId, value); + } + @Override public void putShort(int rowId, short value) { columnVector.putShort(rowId, value); } @@ -112,4 +121,8 @@ class ColumnarVectorWrapper implements CarbonColumnVector { @Override public void reset() { // columnVector.reset(); } + + @Override public DataType getType() { + return columnVector.dataType(); + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala index f8bdcf8..9321706 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala @@ -51,18 +51,49 @@ object TableCreator { dimensionType.exists(x => x.equalsIgnoreCase(dimensionDatatype)) } - protected def extractDimColsAndNoDictionaryFields(fields: Seq[Field], - tableProperties: Map[String, String]): - (Seq[Field], Seq[String]) = { + protected def extractDimAndMsrFields(fields: Seq[Field], + tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = { var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]() + var msrFields: Seq[Field] = Seq[Field]() var dictExcludeCols: Array[String] = Array[String]() var noDictionaryDims: Seq[String] = Seq[String]() var dictIncludeCols: Seq[String] = Seq[String]() + // All columns in sortkey should be there in create table cols + val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS) + var sortKeyDimsTmp: Seq[String] = Seq[String]() + if (sortKeyOption.isDefined) { + var sortKey = sortKeyOption.get.split(',').map(_.trim) + sortKey.foreach { column => + if (!fields.exists(x => x.column.equalsIgnoreCase(column))) { + val errormsg = "sort_columns: " + column + + " does not exist in table. Please check create table statement." + throw new MalformedCarbonCommandException(errormsg) + } else { + val dataType = fields.find(x => + x.column.equalsIgnoreCase(column)).get.dataType.get + if (isComplexDimDictionaryExclude(dataType)) { + val errormsg = "sort_columns is unsupported for complex datatype column: " + column + throw new MalformedCarbonCommandException(errormsg) + } + } + } + + sortKey.foreach { dimension => + if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase(_))) { + fields.foreach { field => + if (field.column.equalsIgnoreCase(dimension)) { + sortKeyDimsTmp :+= field.column + } + } + } + } + } + // All excluded cols should be there in create table cols if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) { dictExcludeCols = - tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim) + tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim) dictExcludeCols .foreach { dictExcludeCol => if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) { @@ -97,7 +128,7 @@ object TableCreator { } } - // include cols should contain exclude cols + // include cols should not contain exclude cols dictExcludeCols.foreach { dicExcludeCol => if (dictIncludeCols.exists(x => x.equalsIgnoreCase(dicExcludeCol))) { val errormsg = "DICTIONARY_EXCLUDE can not contain the same column: " + dicExcludeCol + @@ -108,11 +139,10 @@ object TableCreator { // by default consider all String cols as dims and if any dictionary exclude is present then // add it to noDictionaryDims list. consider all dictionary excludes/include cols as dims - fields.foreach(field => { - + fields.foreach { field => if (dictExcludeCols.toSeq.exists(x => x.equalsIgnoreCase(field.column))) { - if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.TIMESTAMP && - DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) != DataType.DATE) { + val dataType = DataTypeUtil.getDataType(field.dataType.get.toUpperCase()) + if (dataType != DataType.TIMESTAMP && dataType != DataType.DATE) { noDictionaryDims :+= field.column } dimFields += field @@ -120,49 +150,30 @@ object TableCreator { dimFields += field } else if (isDetectAsDimentionDatatype(field.dataType.get)) { dimFields += field + } else if (sortKeyDimsTmp.exists(x => x.equalsIgnoreCase(field.column))) { + noDictionaryDims :+= field.column + dimFields += field + } else { + msrFields :+= field } } - ) - - (dimFields.toSeq, noDictionaryDims) - } - - /** - * Extract the Measure Cols fields. By default all non string cols will be measures. - * - * @param fields - * @param tableProperties - * @return - */ - protected def extractMsrColsFromFields(fields: Seq[Field], - tableProperties: Map[String, String]): Seq[Field] = { - var msrFields: Seq[Field] = Seq[Field]() - var dictIncludedCols: Array[String] = Array[String]() - var dictExcludedCols: Array[String] = Array[String]() - - // get all included cols - if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) { - dictIncludedCols = - tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(',').map(_.trim) - } - // get all excluded cols - if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) { - dictExcludedCols = - tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim) - } - - // by default consider all non string cols as msrs. consider all include/ exclude cols as dims - fields.foreach(field => { - if (!isDetectAsDimentionDatatype(field.dataType.get)) { - if (!dictIncludedCols.exists(x => x.equalsIgnoreCase(field.column)) && - !dictExcludedCols.exists(x => x.equalsIgnoreCase(field.column))) { - msrFields :+= field + var sortKeyDims = sortKeyDimsTmp + if (sortKeyOption.isEmpty) { + // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS. + dimFields.foreach { field => + if (!isComplexDimDictionaryExclude(field.dataType.get)) { + sortKeyDims :+= field.column } } - }) - - msrFields + } + if (sortKeyDims.isEmpty) { + // no SORT_COLUMNS + tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, "") + } else { + tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(",")) + } + (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims) } def getKey(parentColumnName: Option[String], @@ -440,27 +451,24 @@ object TableCreator { } def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String] - , tableName: String, fields: Seq[Field], - partitionCols: Seq[PartitionerField], - bucketFields: Option[BucketFields], - tableProperties: Map[String, String]): TableModel + , tableName: String, fields: Seq[Field], + partitionCols: Seq[PartitionerField], + bucketFields: Option[BucketFields], + tableProperties: Map[String, String]): TableModel = { - val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields( + fields.zipWithIndex.foreach { x => + x._1.schemaOrdinal = x._2 + } + val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields( fields, tableProperties) if (dims.isEmpty) { - throw new MalformedCarbonCommandException(s"Table ${ - dbName.getOrElse( - CarbonCommonConstants.DATABASE_DEFAULT_NAME) - }.$tableName" - + - " can not be created without key columns. Please " + - "use DICTIONARY_INCLUDE or " + - "DICTIONARY_EXCLUDE to set at least one key " + - "column " + + throw new MalformedCarbonCommandException( + s"Table ${dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName " + + "can not be created without key columns. Please use DICTIONARY_INCLUDE or " + + "DICTIONARY_EXCLUDE to set at least one key column " + "if all specified columns are numeric types") } - val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties) // column properties val colProps = extractColumnProperties(fields, tableProperties) @@ -474,18 +482,20 @@ object TableCreator { // validate the tableBlockSize from table properties CommonUtil.validateTableBlockSize(tableProperties) - TableModel(ifNotExistPresent, + TableModel( + ifNotExistPresent, dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME), dbName, tableName, tableProperties, reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))), msrs.map(f => normalizeType(f)), + Option(sortKeyDims), Option(noDictionaryDims), Option(noInvertedIdxCols), groupCols, Some(colProps), - bucketFields) + bucketFields: Option[BucketFields]) } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java index 0bd3e45..407ac2f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java @@ -76,6 +76,10 @@ public class CarbonDataLoadConfiguration { private DictionaryCardinalityFinder cardinalityFinder; + private int numberOfSortColumns; + + private int numberOfNoDictSortColumns; + public CarbonDataLoadConfiguration() { } @@ -121,6 +125,22 @@ public class CarbonDataLoadConfiguration { return dimCount; } + public void setNumberOfSortColumns(int numberOfSortColumns) { + this.numberOfSortColumns = numberOfSortColumns; + } + + public int getNumberOfSortColumns() { + return this.numberOfSortColumns; + } + + public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) { + this.numberOfNoDictSortColumns = numberOfNoDictSortColumns; + } + + public int getNumberOfNoDictSortColumns() { + return this.numberOfNoDictSortColumns; + } + public int getComplexDimensionCount() { int dimCount = 0; for (int i = 0; i < dataFields.length; i++) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java index 4ebb2fb..1932888 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -195,6 +195,8 @@ public final class DataLoadProcessBuilder { configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost()); configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort()); configuration.setPreFetch(loadModel.isPreFetch()); + configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns()); + configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns()); return configuration; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java index 9e4b50d..3accb0b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java @@ -16,8 +16,6 @@ */ package org.apache.carbondata.processing.newflow.converter.impl; -import java.nio.charset.Charset; - import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; @@ -48,23 +46,24 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter { this.isEmptyBadRecord = isEmptyBadRecord; } - @Override - public void convert(CarbonRow row, BadRecordLogHolder logHolder) { + @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) { String dimensionValue = row.getString(index); if (dimensionValue == null || dimensionValue.equals(nullformat)) { - dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL; - } - if (dataType != DataType.STRING) { - if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) { - if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) { + row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index); + } else { + try { + row.update(DataTypeUtil + .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType), index); + } catch (Throwable ex) { + if (dimensionValue.length() != 0 || isEmptyBadRecord) { logHolder.setReason( "The value " + " \"" + dimensionValue + "\"" + " with column name " + column .getColName() + " and column data type " + dataType + " is not a valid " + dataType + " type."); + } else { + row.update(new byte[0], index); } } } - row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), - index); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java index ad96578..a0e4ef1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java @@ -85,7 +85,8 @@ public class ParallelReadMergeSorterImpl implements Sorter { sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(), sortParameters.getAggType(), - sortParameters.getNoDictionaryDimnesionColumn()); + sortParameters.getNoDictionaryDimnesionColumn(), + sortParameters.getNoDictionarySortColumn()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java index e3049d2..430bf1f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -139,7 +139,8 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(), sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(), - sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn()); + sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(), + this.sortParameters.getNoDictionarySortColumn()); return finalMerger; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java index e468028..61c6cca 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java @@ -34,6 +34,8 @@ public class UnsafeCarbonRowPage { private boolean[] noDictionaryDimensionMapping; + private boolean[] noDictionarySortColumnMapping; + private int dimensionSize; private int measureSize; @@ -52,9 +54,11 @@ public class UnsafeCarbonRowPage { private boolean saveToDisk; - public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, int dimensionSize, - int measureSize, char[] aggType, MemoryBlock memoryBlock, boolean saveToDisk) { + public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, + boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, char[] aggType, + MemoryBlock memoryBlock, boolean saveToDisk) { this.noDictionaryDimensionMapping = noDictionaryDimensionMapping; + this.noDictionarySortColumnMapping = noDictionarySortColumnMapping; this.dimensionSize = dimensionSize; this.measureSize = measureSize; this.aggType = aggType; @@ -324,4 +328,7 @@ public class UnsafeCarbonRowPage { return noDictionaryDimensionMapping; } + public boolean[] getNoDictionarySortColumnMapping() { + return noDictionarySortColumnMapping; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java index 9907509..1dc980f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java @@ -108,6 +108,7 @@ public class UnsafeSortDataRows { public void initialize() throws CarbonSortKeyAndGroupByException { MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024); this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(), + parameters.getNoDictionarySortColumn(), parameters.getDimColCount() + parameters.getComplexDimColCount(), parameters.getMeasureColCount(), parameters.getAggType(), baseBlock, !UnsafeMemoryManager.INSTANCE.isMemoryAvailable()); @@ -171,6 +172,7 @@ public class UnsafeSortDataRows { MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSizeInMB * 1024 * 1024); boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable(); rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(), + parameters.getNoDictionarySortColumn(), parameters.getDimColCount() + parameters.getComplexDimColCount(), parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk); bytesAdded += rowPage.addRow(rowBatch[i]); @@ -198,12 +200,12 @@ public class UnsafeSortDataRows { if (this.rowPage.getUsedSize() > 0) { TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>( new UnsafeIntSortDataFormat(rowPage)); - if (parameters.getNoDictionaryCount() > 0) { + if (parameters.getNumberOfNoDictSortColumns() > 0) { timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(), new UnsafeRowComparator(rowPage)); } else { timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(), - new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), rowPage)); + new UnsafeRowComparatorForNormalDIms(rowPage)); } unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage); } else { @@ -288,12 +290,13 @@ public class UnsafeSortDataRows { long startTime = System.currentTimeMillis(); TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>( new UnsafeIntSortDataFormat(page)); - if (parameters.getNoDictionaryCount() > 0) { + // if sort_columns is not none, sort by sort_columns + if (parameters.getNumberOfNoDictSortColumns() > 0) { timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(), new UnsafeRowComparator(page)); } else { timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(), - new UnsafeRowComparatorForNormalDIms(parameters.getDimColCount(), page)); + new UnsafeRowComparatorForNormalDIms(page)); } if (rowPage.isSaveToDisk()) { // create a new file every time http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java index e61a284..476b8ac 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparator.java @@ -27,14 +27,14 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonR public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> { /** - * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions. + * mapping of dictionary and no dictionary of sort_columns. */ - private boolean[] noDictionaryColMaping; + private boolean[] noDictionarySortColumnMaping; private Object baseObject; public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) { - this.noDictionaryColMaping = rowPage.getNoDictionaryDimensionMapping(); + this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping(); this.baseObject = rowPage.getDataBlock().getBaseObject(); } @@ -47,7 +47,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> { long rowB = rowR.address; int sizeA = 0; int sizeB = 0; - for (boolean isNoDictionary : noDictionaryColMaping) { + for (boolean isNoDictionary : noDictionarySortColumnMaping) { if (isNoDictionary) { short aShort1 = CarbonUnsafe.unsafe.getShort(baseObject, rowA + sizeA); sizeA += 2; @@ -89,7 +89,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> { long rowB = rowR.address; int sizeA = 0; int sizeB = 0; - for (boolean isNoDictionary : noDictionaryColMaping) { + for (boolean isNoDictionary : noDictionarySortColumnMaping) { if (isNoDictionary) { short aShort1 = CarbonUnsafe.unsafe.getShort(baseObjectL, rowA + sizeA); sizeA += 2; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java index 7448aee..4fd245f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java @@ -27,11 +27,11 @@ public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbon private Object baseObject; - private int dimCount; + private int numberOfSortColumns; - public UnsafeRowComparatorForNormalDIms(int dimCount, UnsafeCarbonRowPage rowPage) { + public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) { this.baseObject = rowPage.getDataBlock().getBaseObject(); - this.dimCount = dimCount; + this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length; } /** @@ -43,7 +43,7 @@ public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbon long rowB = rowR.address; int sizeA = 0; int sizeB = 0; - for (int i = 0; i < dimCount; i++) { + for (int i = 0; i < numberOfSortColumns; i++) { int dimFieldA = CarbonUnsafe.unsafe.getInt(baseObject, rowA + sizeA); sizeA += 4; int dimFieldB = CarbonUnsafe.unsafe.getInt(baseObject, rowB + sizeB); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java index ed9e0a6..397de63 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java @@ -45,13 +45,13 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder { private int columnSize; public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger, - boolean[] noDictMapping, int columnSize) { + boolean[] noDictSortColumnMapping, int columnSize) { this.actualSize = merger.getEntryCount(); this.mergedAddresses = merger.getMergedAddresses(); this.rowPageIndexes = merger.getRowPageIndexes(); this.rowPages = merger.getUnsafeCarbonRowPages(); LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize); - this.comparator = new NewRowComparator(noDictMapping); + this.comparator = new NewRowComparator(noDictSortColumnMapping); this.columnSize = columnSize; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java index f491623..048f4f8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java @@ -41,11 +41,12 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder { private int columnSize; - public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize) { + public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize, + int numberOfSortColumns) { this.actualSize = rowPage.getBuffer().getActualSize(); this.rowPage = rowPage; LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize); - this.comparator = new NewRowComparator(rowPage.getNoDictionaryDimensionMapping()); + this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping()); this.columnSize = columnSize; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java index 60f259e..6d04ebf 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -153,7 +153,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { this.aggType = parameters.getAggType(); this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1; - comparator = new NewRowComparator(isNoDictionaryDimensionColumn); + comparator = new NewRowComparator(parameters.getNoDictionarySortColumn()); initialize(); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index b98a072..ab59395 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -126,7 +126,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage, parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters - .getMeasureColCount()); + .getMeasureColCount(), parameters.getNumberOfSortColumns()); // initialize sortTempFileChunkHolder.readRow(); @@ -137,7 +137,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec for (final UnsafeInMemoryIntermediateDataMerger merger : merges) { SortTempChunkHolder sortTempFileChunkHolder = - new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionaryDimnesionColumn(), + new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(), parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters .getMeasureColCount()); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java index 0ac2d5c..dbddc1d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java @@ -251,7 +251,8 @@ public class IntermediateFileMerger implements Callable<Void> { new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(), mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(), - mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn()); + mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(), + mergerParameters.getNoDictionarySortColumn()); // initialize sortTempFileChunkHolder.initialize(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java index dd9358c..247251e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java @@ -24,15 +24,15 @@ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; public class NewRowComparator implements Comparator<Object[]> { /** - * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions. + * mapping of dictionary dimensions and no dictionary of sort_column. */ - private boolean[] noDictionaryColMaping; + private boolean[] noDictionarySortColumnMaping; /** - * @param noDictionaryColMaping + * @param noDictionarySortColumnMaping */ - public NewRowComparator(boolean[] noDictionaryColMaping) { - this.noDictionaryColMaping = noDictionaryColMaping; + public NewRowComparator(boolean[] noDictionarySortColumnMaping) { + this.noDictionarySortColumnMaping = noDictionarySortColumnMaping; } /** @@ -43,7 +43,7 @@ public class NewRowComparator implements Comparator<Object[]> { int index = 0; - for (boolean isNoDictionary : noDictionaryColMaping) { + for (boolean isNoDictionary : noDictionarySortColumnMaping) { if (isNoDictionary) { byte[] byteArr1 = (byte[]) rowA[index]; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java index d913b32..241882e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java @@ -26,15 +26,15 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> { /** * dimension count */ - private int dimensionCount; + private int numberOfSortColumns; /** * RowComparatorForNormalDims Constructor * - * @param dimensionCount + * @param numberOfSortColumns */ - public NewRowComparatorForNormalDims(int dimensionCount) { - this.dimensionCount = dimensionCount; + public NewRowComparatorForNormalDims(int numberOfSortColumns) { + this.numberOfSortColumns = numberOfSortColumns; } /** @@ -45,7 +45,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> { public int compare(Object[] rowA, Object[] rowB) { int diff = 0; - for (int i = 0; i < dimensionCount; i++) { + for (int i = 0; i < numberOfSortColumns; i++) { int dimFieldA = (int)rowA[i]; int dimFieldB = (int)rowB[i]; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java index c282f52..2584048 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java @@ -33,15 +33,15 @@ public class RowComparator implements Comparator<Object[]> { /** * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions. */ - private boolean[] noDictionaryColMaping; + private boolean[] noDictionarySortColumnMaping; /** - * @param noDictionaryColMaping + * @param noDictionarySortColumnMaping * @param noDictionaryCount */ - public RowComparator(boolean[] noDictionaryColMaping, int noDictionaryCount) { + public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) { this.noDictionaryCount = noDictionaryCount; - this.noDictionaryColMaping = noDictionaryColMaping; + this.noDictionarySortColumnMaping = noDictionarySortColumnMaping; } /** @@ -53,7 +53,7 @@ public class RowComparator implements Comparator<Object[]> { int normalIndex = 0; int noDictionaryindex = 0; - for (boolean isNoDictionary : noDictionaryColMaping) { + for (boolean isNoDictionary : noDictionarySortColumnMaping) { if (isNoDictionary) { byte[] byteArr1 = (byte[]) rowA[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()]; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java index ceaf5c6..8d914ea 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java @@ -28,15 +28,15 @@ public class RowComparatorForNormalDims implements Comparator<Object[]> { /** * dimension count */ - private int dimensionCount; + private int numberOfSortColumns; /** * RowComparatorForNormalDims Constructor * - * @param dimensionCount + * @param numberOfSortColumns */ - public RowComparatorForNormalDims(int dimensionCount) { - this.dimensionCount = dimensionCount; + public RowComparatorForNormalDims(int numberOfSortColumns) { + this.numberOfSortColumns = numberOfSortColumns; } /** @@ -47,7 +47,7 @@ public class RowComparatorForNormalDims implements Comparator<Object[]> { public int compare(Object[] rowA, Object[] rowB) { int diff = 0; - for (int i = 0; i < dimensionCount; i++) { + for (int i = 0; i < numberOfSortColumns; i++) { int dimFieldA = NonDictionaryUtil.getDimension(i, rowA); int dimFieldB = NonDictionaryUtil.getDimension(i, rowB); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java index 9b5a850..949d8f9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java @@ -191,11 +191,10 @@ public class SortDataRows { Object[][] toSort; toSort = new Object[entryCount][]; System.arraycopy(recordHolderList, 0, toSort, 0, entryCount); - - if (parameters.getNoDictionaryCount() > 0) { - Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn())); + if (parameters.getNumberOfNoDictSortColumns() > 0) { + Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn())); } else { - Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getDimColCount())); + Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); } recordHolderList = toSort; @@ -385,12 +384,12 @@ public class SortDataRows { @Override public Void call() throws Exception { try { long startTime = System.currentTimeMillis(); - if (parameters.getNoDictionaryCount() > 0) { + if (parameters.getNumberOfNoDictSortColumns() > 0) { Arrays.sort(recordHolderArray, - new NewRowComparator(parameters.getNoDictionaryDimnesionColumn())); + new NewRowComparator(parameters.getNoDictionarySortColumn())); } else { Arrays.sort(recordHolderArray, - new NewRowComparatorForNormalDims(parameters.getDimColCount())); + new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); } // create a new file every time http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java index d42dc32..40e933d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java @@ -112,6 +112,12 @@ public class SortParameters { */ private boolean[] noDictionaryDimnesionColumn; + private boolean[] noDictionarySortColumn; + + private int numberOfSortColumns; + + private int numberOfNoDictSortColumns; + private int numberOfCores; public SortParameters getCopy() { @@ -137,6 +143,9 @@ public class SortParameters { parameters.segmentId = segmentId; parameters.taskNo = taskNo; parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn; + parameters.noDictionarySortColumn = noDictionarySortColumn; + parameters.numberOfSortColumns = numberOfSortColumns; + parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns; parameters.numberOfCores = numberOfCores; return parameters; } @@ -317,6 +326,30 @@ public class SortParameters { this.numberOfCores = numberOfCores; } + public int getNumberOfSortColumns() { + return numberOfSortColumns; + } + + public void setNumberOfSortColumns(int numberOfSortColumns) { + this.numberOfSortColumns = Math.min(numberOfSortColumns, this.dimColCount); + } + + public boolean[] getNoDictionarySortColumn() { + return noDictionarySortColumn; + } + + public void setNoDictionarySortColumn(boolean[] noDictionarySortColumn) { + this.noDictionarySortColumn = noDictionarySortColumn; + } + + public int getNumberOfNoDictSortColumns() { + return numberOfNoDictSortColumns; + } + + public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) { + this.numberOfNoDictSortColumns = Math.min(numberOfNoDictSortColumns, noDictionaryCount); + } + public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) { SortParameters parameters = new SortParameters(); CarbonTableIdentifier tableIdentifier = @@ -334,6 +367,16 @@ public class SortParameters { parameters.setComplexDimColCount(configuration.getComplexDimensionCount()); parameters.setNoDictionaryDimnesionColumn( CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields())); + parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns()); + parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns()); + if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length) { + parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn()); + } else { + boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()]; + System.arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, + noDictionarySortColumnTemp, 0, parameters.getNumberOfSortColumns()); + parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp); + } parameters.setObserver(new SortObserver()); // get sort buffer size parameters.setSortBufferSize(Integer.parseInt(carbonProperties http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java index ae01404..2a4346d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java @@ -133,6 +133,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold private boolean[] isNoDictionaryDimensionColumn; /** + * to store whether sort column is of dictionary type or not + */ + private boolean[] isNoDictionarySortColumn; + + /** * Constructor to initialize * * @param tempFile @@ -146,7 +151,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold */ public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount, int measureCount, int fileBufferSize, int noDictionaryCount, char[] aggType, - boolean[] isNoDictionaryDimensionColumn) { + boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) { // set temp file this.tempFile = tempFile; @@ -160,7 +165,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold this.fileBufferSize = fileBufferSize; this.executorService = Executors.newFixedThreadPool(1); this.aggType = aggType; + this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn; + this.isNoDictionarySortColumn = isNoDictionarySortColumn; } /** @@ -409,7 +416,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold int[] rightMdkArray = (int[]) other.returnRow[0]; byte[][] leftNonDictArray = (byte[][]) returnRow[1]; byte[][] rightNonDictArray = (byte[][]) other.returnRow[1]; - for (boolean isNoDictionary : isNoDictionaryDimensionColumn) { + for (boolean isNoDictionary : isNoDictionarySortColumn) { if (isNoDictionary) { diff = UnsafeComparer.INSTANCE .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 2affa03..fe3579b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -709,13 +709,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { int i = 0; int dictionaryColumnCount = -1; int noDictionaryColumnCount = -1; + boolean isSortColumn = false; for (i = 0; i < dimensionType.length; i++) { + isSortColumn = i < segmentProperties.getNumberOfSortColumns(); if (dimensionType[i]) { dictionaryColumnCount++; if (colGrpModel.isColumnar(dictionaryColumnCount)) { submit.add(executorService.submit( - new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true, - isUseInvertedIndex[i]))); + new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), isSortColumn, + isUseInvertedIndex[i] & isSortColumn))); } else { submit.add( executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount]))); @@ -723,7 +725,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } else { submit.add(executorService.submit( new BlockSortThread(i, noDictionaryColumnsData[++noDictionaryColumnCount], false, true, - true, isUseInvertedIndex[i]))); + isSortColumn, isUseInvertedIndex[i] & isSortColumn))); } } for (int k = 0; k < complexColCount; k++) { @@ -747,7 +749,42 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } byte[] composedNonDictStartKey = null; byte[] composedNonDictEndKey = null; - if (noDictionaryStartKey != null) { + + int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns(); + // generate start/end key by sort_columns + if (numberOfDictSortColumns > 0) { + // if sort_columns contain dictionary columns + int[] keySize = columnarSplitter.getBlockKeySize(); + if (keySize.length > numberOfDictSortColumns) { + int newMdkLength = 0; + for (int index = 0; index < numberOfDictSortColumns; index++) { + newMdkLength += keySize[index]; + } + byte[] newStartKeyOfSortKey = new byte[newMdkLength]; + byte[] newEndKeyOfSortKey = new byte[newMdkLength]; + System.arraycopy(startkeyLocal, 0, newStartKeyOfSortKey, 0, newMdkLength); + System.arraycopy(endKeyLocal, 0, newEndKeyOfSortKey, 0, newMdkLength); + startkeyLocal = newStartKeyOfSortKey; + endKeyLocal = newEndKeyOfSortKey; + } + } else { + startkeyLocal = new byte[0]; + endKeyLocal = new byte[0]; + } + + int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns(); + if (numberOfNoDictSortColumns > 0) { + // if sort_columns contain no-dictionary columns + if (noDictionaryStartKey.length > numberOfNoDictSortColumns) { + byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][]; + byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][]; + System.arraycopy(noDictionaryStartKey, 0, newNoDictionaryStartKey, 0, + numberOfNoDictSortColumns); + System + .arraycopy(noDictionaryEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns); + noDictionaryStartKey = newNoDictionaryStartKey; + noDictionaryEndKey = newNoDictionaryEndKey; + } composedNonDictStartKey = NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryStartKey); composedNonDictEndKey = http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f993908d/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java index 68f9bd5..f8454f1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java @@ -101,9 +101,11 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { */ private boolean[] isNoDictionaryColumn; + private boolean[] isNoDictionarySortColumn; + public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName, int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount, - char[] aggType, boolean[] isNoDictionaryColumn) { + char[] aggType, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) { this.tempFileLocation = tempFileLocation; this.tableName = tableName; this.dimensionCount = dimensionCount; @@ -112,6 +114,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { this.aggType = aggType; this.noDictionaryCount = noDictionaryCount; this.isNoDictionaryColumn = isNoDictionaryColumn; + this.isNoDictionarySortColumn = isNoDictionarySortColumn; } /** @@ -180,7 +183,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { // create chunk holder SortTempFileChunkHolder sortTempFileChunkHolder = new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount, - measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn); + measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn, + isNoDictionarySortColumn); // initialize sortTempFileChunkHolder.initialize();