Repository: incubator-carbondata Updated Branches: refs/heads/12-dev 914d61e32 -> f60f6b62c
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java new file mode 100644 index 0000000..bde89ed --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.newflow.steps; + +import java.io.IOException; +import java.math.BigDecimal; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; +import org.apache.carbondata.processing.util.NonDictionaryUtil; + +/** + * if the table doesn't have sort_columns, just convert row format. + */ +public class NoSortProcessorStepImpl extends AbstractDataLoadProcessorStep { + + private int dimensionCount; + + private int dimensionWithComplexCount; + + private int noDictCount; + + private int measureCount; + + private boolean[] isNoDictionaryDimensionColumn; + + private char[] aggType; + + public NoSortProcessorStepImpl(CarbonDataLoadConfiguration configuration, + AbstractDataLoadProcessorStep child) { + super(configuration, child); + this.dimensionWithComplexCount = configuration.getDimensionCount(); + this.noDictCount = + configuration.getNoDictionaryCount() + configuration.getComplexDimensionCount(); + this.dimensionCount = configuration.getDimensionCount() - this.noDictCount; + this.measureCount = configuration.getMeasureCount(); + this.isNoDictionaryDimensionColumn = + CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); + this.aggType = CarbonDataProcessorUtil + .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields()); + } + + @Override public DataField[] getOutput() { + return child.getOutput(); + } + + @Override public void initialize() throws IOException { + child.initialize(); + } + + /** + * convert input CarbonRow to output CarbonRow + * e.g. There is a table as following, + * the number of dictionary dimensions is a, + * the number of no-dictionary dimensions is b, + * the number of complex dimensions is c, + * the number of measures is d. + * input CarbonRow format: the length of Object[] data is a+b+c+d, the number of all columns. + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * | Part | Object item | describe | + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * | Object[0 ~ a+b-1] | Integer, byte[], Integer, ... | dict + no dict dimensions| + * ---------------------------------------------------------------------------------------- + * | Object[a+b ~ a+b+c-1] | byte[], byte[], ... | complex dimensions | + * ---------------------------------------------------------------------------------------- + * | Object[a+b+c ~ a+b+c+d-1]| int, byte[], ... | measures | + * ---------------------------------------------------------------------------------------- + * output CarbonRow format: the length of object[] data is 3. + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * | Part | Object item | describe | + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * | Object[0] | int[a] | dict dimension array | + * ---------------------------------------------------------------------------------------- + * | Object[1] | byte[b+c][] | no dict + complex dim | + * ---------------------------------------------------------------------------------------- + * | Object[2] | Object[d] | measures | + * ---------------------------------------------------------------------------------------- + * @param row + * @return + */ + @Override protected CarbonRow processRow(CarbonRow row) { + int dictIndex = 0; + int nonDicIndex = 0; + int[] dim = new int[this.dimensionCount]; + byte[][] nonDicArray = new byte[this.noDictCount][]; + Object[] measures = new Object[this.measureCount]; + // read dimension values + int dimCount = 0; + for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { + if (isNoDictionaryDimensionColumn[dimCount]) { + nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount); + } else { + dim[dictIndex++] = (int) row.getObject(dimCount); + } + } + + for (; dimCount < this.dimensionWithComplexCount; dimCount++) { + nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount); + } + + // measure values + for (int mesCount = 0; mesCount < this.measureCount; mesCount++) { + Object value = row.getObject(mesCount + this.dimensionWithComplexCount); + if (null != value) { + if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) { + measures[mesCount] = value; + } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) { + measures[mesCount] = value; + } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { + BigDecimal val = (BigDecimal) value; + measures[mesCount] = DataTypeUtil.bigDecimalToByte(val); + } + } else { + measures[mesCount] = null; + } + } + // create new row of size 3 (1 for dims , 1 for high card , 1 for measures) + Object[] holder = new Object[3]; + NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures); + //return out row + return new CarbonRow(holder); + } + + @Override + public void close() { + if (!closed) { + super.close(); + } + } + + @Override protected String getStepName() { + return "No Sort Processor"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java index 5487593..a9e762d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java @@ -251,7 +251,8 @@ public class IntermediateFileMerger implements Callable<Void> { new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(), mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(), - mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn()); + mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(), + mergerParameters.getNoDictionarySortColumn()); // initialize sortTempFileChunkHolder.initialize(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java index dd9358c..247251e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java @@ -24,15 +24,15 @@ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; public class NewRowComparator implements Comparator<Object[]> { /** - * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions. + * mapping of dictionary dimensions and no dictionary of sort_column. */ - private boolean[] noDictionaryColMaping; + private boolean[] noDictionarySortColumnMaping; /** - * @param noDictionaryColMaping + * @param noDictionarySortColumnMaping */ - public NewRowComparator(boolean[] noDictionaryColMaping) { - this.noDictionaryColMaping = noDictionaryColMaping; + public NewRowComparator(boolean[] noDictionarySortColumnMaping) { + this.noDictionarySortColumnMaping = noDictionarySortColumnMaping; } /** @@ -43,7 +43,7 @@ public class NewRowComparator implements Comparator<Object[]> { int index = 0; - for (boolean isNoDictionary : noDictionaryColMaping) { + for (boolean isNoDictionary : noDictionarySortColumnMaping) { if (isNoDictionary) { byte[] byteArr1 = (byte[]) rowA[index]; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java index d913b32..241882e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java @@ -26,15 +26,15 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> { /** * dimension count */ - private int dimensionCount; + private int numberOfSortColumns; /** * RowComparatorForNormalDims Constructor * - * @param dimensionCount + * @param numberOfSortColumns */ - public NewRowComparatorForNormalDims(int dimensionCount) { - this.dimensionCount = dimensionCount; + public NewRowComparatorForNormalDims(int numberOfSortColumns) { + this.numberOfSortColumns = numberOfSortColumns; } /** @@ -45,7 +45,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> { public int compare(Object[] rowA, Object[] rowB) { int diff = 0; - for (int i = 0; i < dimensionCount; i++) { + for (int i = 0; i < numberOfSortColumns; i++) { int dimFieldA = (int)rowA[i]; int dimFieldB = (int)rowB[i]; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java index c282f52..2584048 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java @@ -33,15 +33,15 @@ public class RowComparator implements Comparator<Object[]> { /** * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions. */ - private boolean[] noDictionaryColMaping; + private boolean[] noDictionarySortColumnMaping; /** - * @param noDictionaryColMaping + * @param noDictionarySortColumnMaping * @param noDictionaryCount */ - public RowComparator(boolean[] noDictionaryColMaping, int noDictionaryCount) { + public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) { this.noDictionaryCount = noDictionaryCount; - this.noDictionaryColMaping = noDictionaryColMaping; + this.noDictionarySortColumnMaping = noDictionarySortColumnMaping; } /** @@ -53,7 +53,7 @@ public class RowComparator implements Comparator<Object[]> { int normalIndex = 0; int noDictionaryindex = 0; - for (boolean isNoDictionary : noDictionaryColMaping) { + for (boolean isNoDictionary : noDictionarySortColumnMaping) { if (isNoDictionary) { byte[] byteArr1 = (byte[]) rowA[IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex()]; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java index ceaf5c6..8d914ea 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java @@ -28,15 +28,15 @@ public class RowComparatorForNormalDims implements Comparator<Object[]> { /** * dimension count */ - private int dimensionCount; + private int numberOfSortColumns; /** * RowComparatorForNormalDims Constructor * - * @param dimensionCount + * @param numberOfSortColumns */ - public RowComparatorForNormalDims(int dimensionCount) { - this.dimensionCount = dimensionCount; + public RowComparatorForNormalDims(int numberOfSortColumns) { + this.numberOfSortColumns = numberOfSortColumns; } /** @@ -47,7 +47,7 @@ public class RowComparatorForNormalDims implements Comparator<Object[]> { public int compare(Object[] rowA, Object[] rowB) { int diff = 0; - for (int i = 0; i < dimensionCount; i++) { + for (int i = 0; i < numberOfSortColumns; i++) { int dimFieldA = NonDictionaryUtil.getDimension(i, rowA); int dimFieldB = NonDictionaryUtil.getDimension(i, rowB); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java index 3a7a579..1a7a26d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java @@ -191,11 +191,10 @@ public class SortDataRows { Object[][] toSort; toSort = new Object[entryCount][]; System.arraycopy(recordHolderList, 0, toSort, 0, entryCount); - - if (parameters.getNoDictionaryCount() > 0) { - Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn())); + if (parameters.getNumberOfNoDictSortColumns() > 0) { + Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn())); } else { - Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getDimColCount())); + Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); } recordHolderList = toSort; @@ -385,12 +384,12 @@ public class SortDataRows { @Override public Void call() throws Exception { try { long startTime = System.currentTimeMillis(); - if (parameters.getNoDictionaryCount() > 0) { + if (parameters.getNumberOfNoDictSortColumns() > 0) { Arrays.sort(recordHolderArray, - new NewRowComparator(parameters.getNoDictionaryDimnesionColumn())); + new NewRowComparator(parameters.getNoDictionarySortColumn())); } else { Arrays.sort(recordHolderArray, - new NewRowComparatorForNormalDims(parameters.getDimColCount())); + new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns())); } // create a new file every time http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java index 41e3018..934e063 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java @@ -112,6 +112,12 @@ public class SortParameters { */ private boolean[] noDictionaryDimnesionColumn; + private boolean[] noDictionarySortColumn; + + private int numberOfSortColumns; + + private int numberOfNoDictSortColumns; + private int numberOfCores; public SortParameters getCopy() { @@ -137,6 +143,9 @@ public class SortParameters { parameters.segmentId = segmentId; parameters.taskNo = taskNo; parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn; + parameters.noDictionarySortColumn = noDictionarySortColumn; + parameters.numberOfSortColumns = numberOfSortColumns; + parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns; parameters.numberOfCores = numberOfCores; return parameters; } @@ -317,6 +326,30 @@ public class SortParameters { this.numberOfCores = numberOfCores; } + public int getNumberOfSortColumns() { + return numberOfSortColumns; + } + + public void setNumberOfSortColumns(int numberOfSortColumns) { + this.numberOfSortColumns = Math.min(numberOfSortColumns, this.dimColCount); + } + + public boolean[] getNoDictionarySortColumn() { + return noDictionarySortColumn; + } + + public void setNoDictionarySortColumn(boolean[] noDictionarySortColumn) { + this.noDictionarySortColumn = noDictionarySortColumn; + } + + public int getNumberOfNoDictSortColumns() { + return numberOfNoDictSortColumns; + } + + public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) { + this.numberOfNoDictSortColumns = Math.min(numberOfNoDictSortColumns, noDictionaryCount); + } + public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) { SortParameters parameters = new SortParameters(); CarbonTableIdentifier tableIdentifier = @@ -334,6 +367,16 @@ public class SortParameters { parameters.setComplexDimColCount(configuration.getComplexDimensionCount()); parameters.setNoDictionaryDimnesionColumn( CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields())); + parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns()); + parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns()); + if (parameters.getNumberOfSortColumns() == parameters.getNoDictionaryDimnesionColumn().length) { + parameters.setNoDictionarySortColumn(parameters.getNoDictionaryDimnesionColumn()); + } else { + boolean[] noDictionarySortColumnTemp = new boolean[parameters.getNumberOfSortColumns()]; + System.arraycopy(parameters.getNoDictionaryDimnesionColumn(), 0, + noDictionarySortColumnTemp, 0, parameters.getNumberOfSortColumns()); + parameters.setNoDictionarySortColumn(noDictionarySortColumnTemp); + } parameters.setObserver(new SortObserver()); // get sort buffer size parameters.setSortBufferSize(Integer.parseInt(carbonProperties http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java index b4ccc6f..2b0b8ae 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java @@ -133,6 +133,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold private boolean[] isNoDictionaryDimensionColumn; /** + * to store whether sort column is of dictionary type or not + */ + private boolean[] isNoDictionarySortColumn; + + /** * Constructor to initialize * * @param tempFile @@ -146,7 +151,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold */ public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount, int measureCount, int fileBufferSize, int noDictionaryCount, char[] aggType, - boolean[] isNoDictionaryDimensionColumn) { + boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) { // set temp file this.tempFile = tempFile; @@ -160,7 +165,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold this.fileBufferSize = fileBufferSize; this.executorService = Executors.newFixedThreadPool(1); this.aggType = aggType; + this.isNoDictionaryDimensionColumn = isNoDictionaryDimensionColumn; + this.isNoDictionarySortColumn = isNoDictionarySortColumn; } /** @@ -409,7 +416,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold int[] rightMdkArray = (int[]) other.returnRow[0]; byte[][] leftNonDictArray = (byte[][]) returnRow[1]; byte[][] rightNonDictArray = (byte[][]) other.returnRow[1]; - for (boolean isNoDictionary : isNoDictionaryDimensionColumn) { + for (boolean isNoDictionary : isNoDictionarySortColumn) { if (isNoDictionary) { diff = UnsafeComparer.INSTANCE .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 0e6a49d..2313a0f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -778,13 +778,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { int i = 0; int dictionaryColumnCount = -1; int noDictionaryColumnCount = -1; + boolean isSortColumn = false; for (i = 0; i < dimensionType.length; i++) { + isSortColumn = i < segmentProperties.getNumberOfSortColumns(); if (dimensionType[i]) { dictionaryColumnCount++; if (colGrpModel.isColumnar(dictionaryColumnCount)) { submit.add(executorService.submit( - new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true, - isUseInvertedIndex[i]))); + new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), isSortColumn, + isUseInvertedIndex[i] & isSortColumn))); } else { submit.add( executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount]))); @@ -792,7 +794,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } else { submit.add(executorService.submit( new BlockSortThread(i, noDictionaryColumnsData[++noDictionaryColumnCount], false, true, - true, isUseInvertedIndex[i]))); + isSortColumn, isUseInvertedIndex[i] & isSortColumn))); } } for (int k = 0; k < complexColCount; k++) { @@ -816,7 +818,42 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } byte[] composedNonDictStartKey = null; byte[] composedNonDictEndKey = null; - if (noDictionaryStartKey != null) { + + int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns(); + // generate start/end key by sort_columns + if (numberOfDictSortColumns > 0) { + // if sort_columns contain dictionary columns + int[] keySize = columnarSplitter.getBlockKeySize(); + if (keySize.length > numberOfDictSortColumns) { + int newMdkLength = 0; + for (int index = 0; index < numberOfDictSortColumns; index++) { + newMdkLength += keySize[index]; + } + byte[] newStartKeyOfSortKey = new byte[newMdkLength]; + byte[] newEndKeyOfSortKey = new byte[newMdkLength]; + System.arraycopy(startkeyLocal, 0, newStartKeyOfSortKey, 0, newMdkLength); + System.arraycopy(endKeyLocal, 0, newEndKeyOfSortKey, 0, newMdkLength); + startkeyLocal = newStartKeyOfSortKey; + endKeyLocal = newEndKeyOfSortKey; + } + } else { + startkeyLocal = new byte[0]; + endKeyLocal = new byte[0]; + } + + int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns(); + if (numberOfNoDictSortColumns > 0) { + // if sort_columns contain no-dictionary columns + if (noDictionaryStartKey.length > numberOfNoDictSortColumns) { + byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][]; + byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][]; + System.arraycopy(noDictionaryStartKey, 0, newNoDictionaryStartKey, 0, + numberOfNoDictSortColumns); + System + .arraycopy(noDictionaryEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns); + noDictionaryStartKey = newNoDictionaryStartKey; + noDictionaryEndKey = newNoDictionaryEndKey; + } composedNonDictStartKey = NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryStartKey); composedNonDictEndKey = http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index e64caea..1c94bf1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -204,6 +204,13 @@ public class CarbonFactDataHandlerModel { CarbonDataProcessorUtil.getIsUseInvertedIndex(configuration.getDataFields()); int[] dimLensWithComplex = configuration.getCardinalityFinder().getCardinality(); + if (!configuration.isSortTable()) { + for (int i = 0; i < dimLensWithComplex.length; i++) { + if (dimLensWithComplex[i] != 0) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + } + } List<Integer> dimsLenList = new ArrayList<Integer>(); for (int eachDimLen : dimLensWithComplex) { if (eachDimLen != 0) dimsLenList.add(eachDimLen); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fcf74017/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java index 68f9bd5..f8454f1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java @@ -101,9 +101,11 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { */ private boolean[] isNoDictionaryColumn; + private boolean[] isNoDictionarySortColumn; + public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName, int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount, - char[] aggType, boolean[] isNoDictionaryColumn) { + char[] aggType, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) { this.tempFileLocation = tempFileLocation; this.tableName = tableName; this.dimensionCount = dimensionCount; @@ -112,6 +114,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { this.aggType = aggType; this.noDictionaryCount = noDictionaryCount; this.isNoDictionaryColumn = isNoDictionaryColumn; + this.isNoDictionarySortColumn = isNoDictionarySortColumn; } /** @@ -180,7 +183,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { // create chunk holder SortTempFileChunkHolder sortTempFileChunkHolder = new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount, - measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn); + measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn, + isNoDictionarySortColumn); // initialize sortTempFileChunkHolder.initialize();