[CARBONDATA-2018][DataLoad] Optimization in reading/writing for sort temp row
Pick up the no-sort fields in the row and pack them as bytes array and skip parsing them during merge sort to reduce CPU consumption This closes #1792 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2b41f140 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2b41f140 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2b41f140 Branch: refs/heads/master Commit: 2b41f140229c1799178313b257f3779908e69010 Parents: 89cfd8e Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Thu Feb 8 14:35:14 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Mar 8 22:21:10 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/util/NonDictionaryUtil.java | 67 +-- .../presto/util/CarbonDataStoreCreator.scala | 1 - .../load/DataLoadProcessorStepOnSpark.scala | 6 +- .../loading/row/IntermediateSortTempRow.java | 117 +++++ .../loading/sort/SortStepRowHandler.java | 466 +++++++++++++++++++ .../loading/sort/SortStepRowUtil.java | 103 ---- .../sort/unsafe/UnsafeCarbonRowPage.java | 331 ++----------- .../loading/sort/unsafe/UnsafeSortDataRows.java | 57 +-- .../unsafe/comparator/UnsafeRowComparator.java | 95 ++-- .../UnsafeRowComparatorForNormalDIms.java | 59 --- .../UnsafeRowComparatorForNormalDims.java | 59 +++ .../sort/unsafe/holder/SortTempChunkHolder.java | 3 +- .../holder/UnsafeFinalMergePageHolder.java | 19 +- .../unsafe/holder/UnsafeInmemoryHolder.java | 21 +- .../holder/UnsafeSortTempFileChunkHolder.java | 138 ++---- .../merger/UnsafeIntermediateFileMerger.java | 118 +---- .../UnsafeSingleThreadFinalSortFilesMerger.java | 27 +- .../merger/CompactionResultSortProcessor.java | 1 - .../sort/sortdata/IntermediateFileMerger.java | 95 +--- .../IntermediateSortTempRowComparator.java | 73 +++ .../sort/sortdata/NewRowComparator.java | 5 +- .../sortdata/NewRowComparatorForNormalDims.java | 3 +- .../processing/sort/sortdata/RowComparator.java | 94 ---- .../sortdata/RowComparatorForNormalDims.java | 62 --- .../SingleThreadFinalSortFilesMerger.java | 25 +- .../processing/sort/sortdata/SortDataRows.java | 85 +--- .../sort/sortdata/SortTempFileChunkHolder.java | 174 ++----- .../sort/sortdata/TableFieldStat.java | 176 +++++++ 28 files changed, 1186 insertions(+), 1294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java index d6ecfbc..fca1244 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java @@ -82,18 +82,26 @@ public class NonDictionaryUtil { } /** - * Method to get the required Dimension from obj [] + * Method to get the required dictionary Dimension from obj [] * * @param index * @param row * @return */ - public static Integer getDimension(int index, Object[] row) { - - Integer[] dimensions = (Integer[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION]; - + public static int getDictDimension(int index, Object[] row) { + int[] dimensions = (int[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION]; return dimensions[index]; + } + /** + * Method to get the required non-dictionary & complex from 3-parted row + * @param index + * @param row + * @return + */ + public static byte[] getNoDictOrComplex(int index, Object[] row) { + byte[][] nonDictArray = (byte[][]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX]; + return nonDictArray[index]; } /** @@ -108,60 +116,11 @@ public class NonDictionaryUtil { return measures[index]; } - public static byte[] getByteArrayForNoDictionaryCols(Object[] row) { - - return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX]; - } - public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr, Object[] measureArray) { - out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray; out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr; out[WriteStepRowUtil.MEASURE] = measureArray; } - - /** - * This method will extract the single dimension from the complete high card dims byte[].+ * - * The format of the byte [] will be, Totallength,CompleteStartOffsets,Dat - * - * @param highCardArr - * @param index - * @param highCardinalityCount - * @param outBuffer - */ - public static void extractSingleHighCardDims(byte[] highCardArr, int index, - int highCardinalityCount, ByteBuffer outBuffer) { - ByteBuffer buff = null; - short secIndex = 0; - short firstIndex = 0; - int length; - // if the requested index is a last one then we need to calculate length - // based on byte[] length. - if (index == highCardinalityCount - 1) { - // need to read 2 bytes(1 short) to determine starting offset and - // length can be calculated by array length. - buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2); - } else { - // need to read 4 bytes(2 short) to determine starting offset and - // length. - buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4); - } - - firstIndex = buff.getShort(); - // if it is a last dimension in high card then this will be last - // offset.so calculate length from total length - if (index == highCardinalityCount - 1) { - secIndex = (short) highCardArr.length; - } else { - secIndex = buff.getShort(); - } - - length = secIndex - firstIndex; - - outBuffer.position(firstIndex); - outBuffer.limit(outBuffer.position() + length); - - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala index e768660..1bc9812 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala @@ -383,7 +383,6 @@ object CarbonDataStoreCreator { .getInstance.createCache(CacheType.REVERSE_DICTIONARY) for (i <- set.indices) { - // val dim = getDimension(dims, i).get val columnIdentifier: ColumnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId, null, null) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 5124247..0422239 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -35,7 +35,7 @@ import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl -import org.apache.carbondata.processing.loading.sort.SortStepRowUtil +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl import org.apache.carbondata.processing.sort.sortdata.SortParameters import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} @@ -206,7 +206,7 @@ object DataLoadProcessorStepOnSpark { val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) val conf = DataLoadProcessBuilder.createConfiguration(model) val sortParameters = SortParameters.createSortParameters(conf) - val sortStepRowUtil = new SortStepRowUtil(sortParameters) + val sortStepRowHandler = new SortStepRowHandler(sortParameters) TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => wrapException(e, model) } @@ -216,7 +216,7 @@ object DataLoadProcessorStepOnSpark { override def next(): CarbonRow = { val row = - new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData)) + new CarbonRow(sortStepRowHandler.convertRawRowTo3Parts(rows.next().getData)) rowCounter.add(1) row } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java new file mode 100644 index 0000000..8d351cf --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.loading.row; + +import java.nio.ByteBuffer; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.DataTypeUtil; + +/** + * During sort procedure, each row will be written to sort temp file in this logic format. + * an intermediate sort temp row consists 3 parts: + * dictSort, noDictSort, noSortDimsAndMeasures(dictNoSort, noDictNoSort, measure) + */ +public class IntermediateSortTempRow { + private int[] dictSortDims; + private byte[][] noDictSortDims; + private byte[] noSortDimsAndMeasures; + + public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims, + byte[] noSortDimsAndMeasures) { + this.dictSortDims = dictSortDims; + this.noDictSortDims = noDictSortDims; + this.noSortDimsAndMeasures = noSortDimsAndMeasures; + } + + public int[] getDictSortDims() { + return dictSortDims; + } + + public byte[][] getNoDictSortDims() { + return noDictSortDims; + } + + public byte[] getNoSortDimsAndMeasures() { + return noSortDimsAndMeasures; + } + + /** + * deserialize from bytes array to get the no sort fields + * @param outDictNoSort stores the dict & no-sort fields + * @param outNoDictNoSort stores the no-dict & no-sort fields, including complex + * @param outMeasures stores the measure fields + * @param dataTypes data type for the measure + */ + public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort, + Object[] outMeasures, DataType[] dataTypes) { + ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures); + // read dict_no_sort + int dictNoSortCnt = outDictNoSort.length; + for (int i = 0; i < dictNoSortCnt; i++) { + outDictNoSort[i] = rowBuffer.getInt(); + } + + // read no_dict_no_sort (including complex) + int noDictNoSortCnt = outNoDictNoSort.length; + for (int i = 0; i < noDictNoSortCnt; i++) { + short len = rowBuffer.getShort(); + byte[] bytes = new byte[len]; + rowBuffer.get(bytes); + outNoDictNoSort[i] = bytes; + } + + // read measure + int measureCnt = outMeasures.length; + DataType tmpDataType; + Object tmpContent; + for (short idx = 0 ; idx < measureCnt; idx++) { + if ((byte) 0 == rowBuffer.get()) { + outMeasures[idx] = null; + continue; + } + + tmpDataType = dataTypes[idx]; + if (DataTypes.BOOLEAN == tmpDataType) { + if ((byte) 1 == rowBuffer.get()) { + tmpContent = true; + } else { + tmpContent = false; + } + } else if (DataTypes.SHORT == tmpDataType) { + tmpContent = rowBuffer.getShort(); + } else if (DataTypes.INT == tmpDataType) { + tmpContent = rowBuffer.getInt(); + } else if (DataTypes.LONG == tmpDataType) { + tmpContent = rowBuffer.getLong(); + } else if (DataTypes.DOUBLE == tmpDataType) { + tmpContent = rowBuffer.getDouble(); + } else if (DataTypes.isDecimal(tmpDataType)) { + short len = rowBuffer.getShort(); + byte[] decimalBytes = new byte[len]; + rowBuffer.get(decimalBytes); + tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes); + } else { + throw new IllegalArgumentException("Unsupported data type: " + tmpDataType); + } + outMeasures[idx] = tmpContent; + } + } + + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java new file mode 100644 index 0000000..f31a2b9 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.sort; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +import org.apache.carbondata.core.memory.CarbonUnsafe; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.sort.sortdata.SortParameters; +import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; + +/** + * This class is used to convert/write/read row in sort step in carbondata. + * It consists the following function: + * 1. convert raw row & intermediate sort temp row to 3-parted row + * 2. read/write intermediate sort temp row to sort temp file & unsafe memory + * 3. write raw row directly to sort temp file & unsafe memory as intermediate sort temp row + */ +public class SortStepRowHandler implements Serializable { + private static final long serialVersionUID = 1L; + private int dictSortDimCnt = 0; + private int dictNoSortDimCnt = 0; + private int noDictSortDimCnt = 0; + private int noDictNoSortDimCnt = 0; + private int measureCnt; + + // indices for dict & sort dimension columns + private int[] dictSortDimIdx; + // indices for dict & no-sort dimension columns + private int[] dictNoSortDimIdx; + // indices for no-dict & sort dimension columns + private int[] noDictSortDimIdx; + // indices for no-dict & no-sort dimension columns, including complex columns + private int[] noDictNoSortDimIdx; + // indices for measure columns + private int[] measureIdx; + + private DataType[] dataTypes; + + /** + * constructor + * @param tableFieldStat table field stat + */ + public SortStepRowHandler(TableFieldStat tableFieldStat) { + this.dictSortDimCnt = tableFieldStat.getDictSortDimCnt(); + this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt(); + this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt(); + this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt(); + this.measureCnt = tableFieldStat.getMeasureCnt(); + this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx(); + this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx(); + this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx(); + this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx(); + this.measureIdx = tableFieldStat.getMeasureIdx(); + this.dataTypes = tableFieldStat.getMeasureDataType(); + } + + /** + * constructor + * @param sortParameters sort parameters + */ + public SortStepRowHandler(SortParameters sortParameters) { + this(new TableFieldStat(sortParameters)); + } + + /** + * Convert carbon row from raw format to 3-parted format. + * This method is used in global-sort. + * + * @param row raw row whose length is the same as field number + * @return 3-parted row whose length is 3. (1 for dict dims ,1 for non-dict and complex, + * 1 for measures) + */ + public Object[] convertRawRowTo3Parts(Object[] row) { + Object[] holder = new Object[3]; + try { + int[] dictDims + = new int[this.dictSortDimCnt + this.dictNoSortDimCnt]; + byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][]; + Object[] measures = new Object[this.measureCnt]; + + // convert dict & data + int idxAcc = 0; + for (int idx = 0; idx < this.dictSortDimCnt; idx++) { + dictDims[idxAcc++] = (int) row[this.dictSortDimIdx[idx]]; + } + + // convert dict & no-sort + for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) { + dictDims[idxAcc++] = (int) row[this.dictNoSortDimIdx[idx]]; + } + // convert no-dict & sort + idxAcc = 0; + for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { + nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]]; + } + // convert no-dict & no-sort + for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) { + nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]]; + } + + // convert measure data + for (int idx = 0; idx < this.measureCnt; idx++) { + measures[idx] = row[this.measureIdx[idx]]; + } + + NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures); + } catch (Exception e) { + throw new RuntimeException("Problem while converting row to 3 parts", e); + } + return holder; + } + + /** + * Convert intermediate sort temp row to 3-parted row. + * This method is used in the final merge sort to feed rows to the next write step. + * + * @param sortTempRow intermediate sort temp row + * @return 3-parted row + */ + public Object[] convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) { + int[] dictDims + = new int[this.dictSortDimCnt + this.dictNoSortDimCnt]; + byte[][] noDictArray + = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][]; + + int[] dictNoSortDims = new int[this.dictNoSortDimCnt]; + byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][]; + Object[] measures = new Object[this.measureCnt]; + + sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, measures, this.dataTypes); + + // dict dims + System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims, + 0, this.dictSortDimCnt); + System.arraycopy(dictNoSortDims, 0, dictDims, + this.dictSortDimCnt, this.dictNoSortDimCnt);; + + // no dict dims, including complex + System.arraycopy(sortTempRow.getNoDictSortDims(), 0, + noDictArray, 0, this.noDictSortDimCnt); + System.arraycopy(noDictNoSortDims, 0, noDictArray, + this.noDictSortDimCnt, this.noDictNoSortDimCnt); + + // measures are already here + + Object[] holder = new Object[3]; + NonDictionaryUtil.prepareOutObj(holder, dictDims, noDictArray, measures); + return holder; + } + + /** + * Read intermediate sort temp row from InputStream. + * This method is used during the merge sort phase to read row from sort temp file. + * + * @param inputStream input stream + * @return a row that contains three parts + * @throws IOException if error occrus while reading from stream + */ + public IntermediateSortTempRow readIntermediateSortTempRowFromInputStream( + DataInputStream inputStream) throws IOException { + int[] dictSortDims = new int[this.dictSortDimCnt]; + byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][]; + + // read dict & sort dim data + for (int idx = 0; idx < this.dictSortDimCnt; idx++) { + dictSortDims[idx] = inputStream.readInt(); + } + + // read no-dict & sort data + for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { + short len = inputStream.readShort(); + byte[] bytes = new byte[len]; + inputStream.readFully(bytes); + noDictSortDims[idx] = bytes; + } + + // read no-dict dims & measures + int len = inputStream.readInt(); + byte[] noSortDimsAndMeasures = new byte[len]; + inputStream.readFully(noSortDimsAndMeasures); + + return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures); + } + + /** + * Write intermediate sort temp row to OutputStream + * This method is used during the merge sort phase to write row to sort temp file. + * + * @param sortTempRow intermediate sort temp row + * @param outputStream output stream + * @throws IOException if error occurs while writing to stream + */ + public void writeIntermediateSortTempRowToOutputStream(IntermediateSortTempRow sortTempRow, + DataOutputStream outputStream) throws IOException { + // write dict & sort dim + for (int idx = 0; idx < this.dictSortDimCnt; idx++) { + outputStream.writeInt(sortTempRow.getDictSortDims()[idx]); + } + + // write no-dict & sort dim + for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { + byte[] bytes = sortTempRow.getNoDictSortDims()[idx]; + outputStream.writeShort(bytes.length); + outputStream.write(bytes); + } + + // write packed no-sort dim & measure + outputStream.writeInt(sortTempRow.getNoSortDimsAndMeasures().length); + outputStream.write(sortTempRow.getNoSortDimsAndMeasures()); + } + + /** + * Write raw row as an intermediate sort temp row to sort temp file. + * This method is used in the beginning of the sort phase. Comparing with converting raw row to + * intermediate sort temp row and then writing the converted one, Writing raw row directly will + * save the intermediate trivial loss. + * This method use an array backend buffer to save memory allocation. The buffer will be reused + * for all rows (per thread). + * + * @param row raw row + * @param outputStream output stream + * @param rowBuffer array backend buffer + * @throws IOException if error occurs while writing to stream + */ + public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row, + DataOutputStream outputStream, ByteBuffer rowBuffer) throws IOException { + // write dict & sort + for (int idx = 0; idx < this.dictSortDimCnt; idx++) { + outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]); + } + + // write no-dict & sort + for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { + byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]]; + outputStream.writeShort(bytes.length); + outputStream.write(bytes); + } + + // pack no-sort + rowBuffer.clear(); + packNoSortFieldsToBytes(row, rowBuffer); + rowBuffer.flip(); + int packSize = rowBuffer.limit(); + + // write no-sort + outputStream.writeInt(packSize); + outputStream.write(rowBuffer.array(), 0, packSize); + } + + /** + * Read intermediate sort temp row from unsafe memory. + * This method is used during merge sort phase for off-heap sort. + * + * @param baseObject base object of memory block + * @param address address of the row + * @return intermediate sort temp row + */ + public IntermediateSortTempRow readIntermediateSortTempRowFromUnsafeMemory(Object baseObject, + long address) { + int size = 0; + + int[] dictSortDims = new int[this.dictSortDimCnt]; + byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][]; + + // read dict & sort dim + for (int idx = 0; idx < dictSortDims.length; idx++) { + dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); + size += 4; + } + + // read no-dict & sort dim + for (int idx = 0; idx < noDictSortDims.length; idx++) { + short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); + size += 2; + byte[] bytes = new byte[length]; + CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, + bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); + size += length; + noDictSortDims[idx] = bytes; + } + + // read no-sort dims & measures + int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); + size += 4; + byte[] noSortDimsAndMeasures = new byte[len]; + CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, + noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len); + + return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures); + } + + /** + * Write intermediate sort temp row directly from unsafe memory to stream. + * This method is used at the late beginning of the sort phase to write in-memory pages + * to sort temp file. Comparing with reading intermediate sort temp row from memory and then + * writing it, Writing directly from memory to stream will save the intermediate trivial loss. + * + * @param baseObject base object of the memory block + * @param address base address of the row + * @param outputStream output stream + * @throws IOException if error occurs while writing to stream + */ + public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject, + long address, DataOutputStream outputStream) throws IOException { + int size = 0; + + // dict & sort + for (int idx = 0; idx < dictSortDimCnt; idx++) { + outputStream.writeInt(CarbonUnsafe.getUnsafe().getInt(baseObject, address + size)); + size += 4; + } + + // no-dict & sort + for (int idx = 0; idx < noDictSortDimCnt; idx++) { + short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); + size += 2; + byte[] bytes = new byte[length]; + CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, + bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length); + size += length; + + outputStream.writeShort(length); + outputStream.write(bytes); + } + + // packed no-sort & measure + int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); + size += 4; + byte[] noSortDimsAndMeasures = new byte[len]; + CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, + noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len); + size += len; + + outputStream.writeInt(len); + outputStream.write(noSortDimsAndMeasures); + } + + /** + * Write raw row as an intermediate sort temp row to memory. + * This method is used in the beginning of the off-heap sort phase. Comparing with converting + * raw row to intermediate sort temp row and then writing the converted one, + * Writing raw row directly will save the intermediate trivial loss. + * This method use an array backend buffer to save memory allocation. The buffer will be reused + * for all rows (per thread). + * + * @param row raw row + * @param baseObject base object of the memory block + * @param address base address for the row + * @param rowBuffer array backend buffer + * @return number of bytes written to memory + */ + public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row, + Object baseObject, long address, ByteBuffer rowBuffer) { + int size = 0; + // write dict & sort + for (int idx = 0; idx < this.dictSortDimCnt; idx++) { + CarbonUnsafe.getUnsafe() + .putInt(baseObject, address + size, (int) row[this.dictSortDimIdx[idx]]); + size += 4; + } + + // write no-dict & sort + for (int idx = 0; idx < this.noDictSortDimCnt; idx++) { + byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]]; + CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length); + size += 2; + CarbonUnsafe.getUnsafe() + .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, + bytes.length); + size += bytes.length; + } + + // convert pack no-sort + rowBuffer.clear(); + packNoSortFieldsToBytes(row, rowBuffer); + rowBuffer.flip(); + int packSize = rowBuffer.limit(); + + // write no-sort + CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize); + size += 4; + CarbonUnsafe.getUnsafe() + .copyMemory(rowBuffer.array(), CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size, + packSize); + size += packSize; + return size; + } + + /** + * Pack to no-sort fields to byte array + * + * @param row raw row + * @param rowBuffer byte array backend buffer + */ + private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) { + // convert dict & no-sort + for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) { + rowBuffer.putInt((int) row[this.dictNoSortDimIdx[idx]]); + } + // convert no-dict & no-sort + for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) { + byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]]; + rowBuffer.putShort((short) bytes.length); + rowBuffer.put(bytes); + } + + // convert measure + Object tmpValue; + DataType tmpDataType; + for (int idx = 0; idx < this.measureCnt; idx++) { + tmpValue = row[this.measureIdx[idx]]; + tmpDataType = this.dataTypes[idx]; + if (null == tmpValue) { + rowBuffer.put((byte) 0); + continue; + } + rowBuffer.put((byte) 1); + if (DataTypes.BOOLEAN == tmpDataType) { + if ((boolean) tmpValue) { + rowBuffer.put((byte) 1); + } else { + rowBuffer.put((byte) 0); + } + } else if (DataTypes.SHORT == tmpDataType) { + rowBuffer.putShort((Short) tmpValue); + } else if (DataTypes.INT == tmpDataType) { + rowBuffer.putInt((Integer) tmpValue); + } else if (DataTypes.LONG == tmpDataType) { + rowBuffer.putLong((Long) tmpValue); + } else if (DataTypes.DOUBLE == tmpDataType) { + rowBuffer.putDouble((Double) tmpValue); + } else if (DataTypes.isDecimal(tmpDataType)) { + byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue); + rowBuffer.putShort((short) decimalBytes.length); + rowBuffer.put(decimalBytes); + } else { + throw new IllegalArgumentException("Unsupported data type: " + tmpDataType); + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java deleted file mode 100644 index c4e4756..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.loading.sort; - -import org.apache.carbondata.core.util.NonDictionaryUtil; -import org.apache.carbondata.processing.sort.sortdata.SortParameters; - -public class SortStepRowUtil { - private int measureCount; - private int dimensionCount; - private int complexDimensionCount; - private int noDictionaryCount; - private int[] dictDimIdx; - private int[] nonDictIdx; - private int[] measureIdx; - - public SortStepRowUtil(SortParameters parameters) { - this.measureCount = parameters.getMeasureColCount(); - this.dimensionCount = parameters.getDimColCount(); - this.complexDimensionCount = parameters.getComplexDimColCount(); - this.noDictionaryCount = parameters.getNoDictionaryCount(); - boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); - - int index = 0; - int nonDicIndex = 0; - int allCount = 0; - - // be careful that the default value is 0 - this.dictDimIdx = new int[dimensionCount - noDictionaryCount]; - this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount]; - this.measureIdx = new int[measureCount]; - - // indices for dict dim columns - for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) { - if (isNoDictionaryDimensionColumn[i]) { - nonDictIdx[nonDicIndex++] = i; - } else { - dictDimIdx[index++] = allCount; - } - allCount++; - } - - // indices for non dict dim/complex columns - for (int i = 0; i < complexDimensionCount; i++) { - nonDictIdx[nonDicIndex++] = allCount; - allCount++; - } - - // indices for measure columns - for (int i = 0; i < measureCount; i++) { - measureIdx[i] = allCount; - allCount++; - } - } - - public Object[] convertRow(Object[] data) { - // create new row of size 3 (1 for dims , 1 for high card , 1 for measures) - Object[] holder = new Object[3]; - try { - - int[] dictDims = new int[dimensionCount - noDictionaryCount]; - byte[][] nonDictArray = new byte[noDictionaryCount + complexDimensionCount][]; - Object[] measures = new Object[measureCount]; - - // write dict dim data - for (int idx = 0; idx < dictDimIdx.length; idx++) { - dictDims[idx] = (int) data[dictDimIdx[idx]]; - } - - // write non dict dim data - for (int idx = 0; idx < nonDictIdx.length; idx++) { - nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]]; - } - - // write measure data - for (int idx = 0; idx < measureIdx.length; idx++) { - measures[idx] = data[measureIdx[idx]]; - } - NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures); - - // increment number if record read - } catch (Exception e) { - throw new RuntimeException("Problem while converting row ", e); - } - //return out row - return holder; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java index e5583c2..7ea5cb3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java @@ -19,35 +19,20 @@ package org.apache.carbondata.processing.loading.sort.unsafe; import java.io.DataOutputStream; import java.io.IOException; -import java.math.BigDecimal; -import java.util.Arrays; +import java.nio.ByteBuffer; -import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.IntPointerBuffer; import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; /** * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed */ public class UnsafeCarbonRowPage { - - private boolean[] noDictionaryDimensionMapping; - - private boolean[] noDictionarySortColumnMapping; - - private int dimensionSize; - - private int measureSize; - - private DataType[] measureDataType; - - private long[] nullSetWords; - private IntPointerBuffer buffer; private int lastSize; @@ -62,16 +47,14 @@ public class UnsafeCarbonRowPage { private long taskId; - public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, - boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type, - MemoryBlock memoryBlock, boolean saveToDisk, long taskId) { - this.noDictionaryDimensionMapping = noDictionaryDimensionMapping; - this.noDictionarySortColumnMapping = noDictionarySortColumnMapping; - this.dimensionSize = dimensionSize; - this.measureSize = measureSize; - this.measureDataType = type; + private TableFieldStat tableFieldStat; + private SortStepRowHandler sortStepRowHandler; + + public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock, + boolean saveToDisk, long taskId) { + this.tableFieldStat = tableFieldStat; + this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); this.saveToDisk = saveToDisk; - this.nullSetWords = new long[((measureSize - 1) >> 6) + 1]; this.taskId = taskId; buffer = new IntPointerBuffer(this.taskId); this.dataBlock = memoryBlock; @@ -80,255 +63,44 @@ public class UnsafeCarbonRowPage { this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER; } - public int addRow(Object[] row) { - int size = addRow(row, dataBlock.getBaseOffset() + lastSize); + public int addRow(Object[] row, ByteBuffer rowBuffer) { + int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer); buffer.set(lastSize); lastSize = lastSize + size; return size; } - private int addRow(Object[] row, long address) { - if (row == null) { - throw new RuntimeException("Row is null ??"); - } - int dimCount = 0; - int size = 0; - Object baseObject = dataBlock.getBaseObject(); - for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) { - if (noDictionaryDimensionMapping[dimCount]) { - byte[] col = (byte[]) row[dimCount]; - CarbonUnsafe.getUnsafe() - .putShort(baseObject, address + size, (short) col.length); - size += 2; - CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, - address + size, col.length); - size += col.length; - } else { - int value = (int) row[dimCount]; - CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value); - size += 4; - } - } - - // write complex dimensions here. - for (; dimCount < dimensionSize; dimCount++) { - byte[] col = (byte[]) row[dimCount]; - CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length); - size += 2; - CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, - address + size, col.length); - size += col.length; - } - Arrays.fill(nullSetWords, 0); - int nullSetSize = nullSetWords.length * 8; - int nullWordLoc = size; - size += nullSetSize; - for (int mesCount = 0; mesCount < measureSize; mesCount++) { - Object value = row[mesCount + dimensionSize]; - if (null != value) { - DataType dataType = measureDataType[mesCount]; - if (dataType == DataTypes.BOOLEAN) { - Boolean bval = (Boolean) value; - CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, bval); - size += 1; - } else if (dataType == DataTypes.SHORT) { - Short sval = (Short) value; - CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval); - size += 2; - } else if (dataType == DataTypes.INT) { - Integer ival = (Integer) value; - CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival); - size += 4; - } else if (dataType == DataTypes.LONG) { - Long val = (Long) value; - CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val); - size += 8; - } else if (dataType == DataTypes.DOUBLE) { - Double doubleVal = (Double) value; - CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal); - size += 8; - } else if (DataTypes.isDecimal(dataType)) { - BigDecimal decimalVal = (BigDecimal) value; - byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal); - CarbonUnsafe.getUnsafe() - .putShort(baseObject, address + size, (short) bigDecimalInBytes.length); - size += 2; - CarbonUnsafe.getUnsafe() - .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, - address + size, bigDecimalInBytes.length); - size += bigDecimalInBytes.length; - } else { - throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]); - } - set(nullSetWords, mesCount); - } else { - unset(nullSetWords, mesCount); - } - } - CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject, - address + nullWordLoc, nullSetSize); - return size; + /** + * add raw row as intermidiate sort temp row to page + * + * @param row + * @param address + * @return + */ + private int addRow(Object[] row, long address, ByteBuffer rowBuffer) { + return sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row, + dataBlock.getBaseObject(), address, rowBuffer); } - public Object[] getRow(long address, Object[] rowToFill) { - int dimCount = 0; - int size = 0; - - Object baseObject = dataBlock.getBaseObject(); - for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) { - if (noDictionaryDimensionMapping[dimCount]) { - short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); - byte[] col = new byte[aShort]; - size += 2; - CarbonUnsafe.getUnsafe() - .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, - col.length); - size += col.length; - rowToFill[dimCount] = col; - } else { - int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); - size += 4; - rowToFill[dimCount] = anInt; - } - } - - // write complex dimensions here. - for (; dimCount < dimensionSize; dimCount++) { - short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); - byte[] col = new byte[aShort]; - size += 2; - CarbonUnsafe.getUnsafe() - .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length); - size += col.length; - rowToFill[dimCount] = col; - } - - int nullSetSize = nullSetWords.length * 8; - Arrays.fill(nullSetWords, 0); - CarbonUnsafe.getUnsafe() - .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, - nullSetSize); - size += nullSetSize; - - for (int mesCount = 0; mesCount < measureSize; mesCount++) { - if (isSet(nullSetWords, mesCount)) { - DataType dataType = measureDataType[mesCount]; - if (dataType == DataTypes.BOOLEAN) { - Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size); - size += 1; - rowToFill[dimensionSize + mesCount] = bval; - } else if (dataType == DataTypes.SHORT) { - Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); - size += 2; - rowToFill[dimensionSize + mesCount] = sval; - } else if (dataType == DataTypes.INT) { - Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); - size += 4; - rowToFill[dimensionSize + mesCount] = ival; - } else if (dataType == DataTypes.LONG) { - Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size); - size += 8; - rowToFill[dimensionSize + mesCount] = val; - } else if (dataType == DataTypes.DOUBLE) { - Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size); - size += 8; - rowToFill[dimensionSize + mesCount] = doubleVal; - } else if (DataTypes.isDecimal(dataType)) { - short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); - byte[] bigDecimalInBytes = new byte[aShort]; - size += 2; - CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes, - CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length); - size += bigDecimalInBytes.length; - rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); - } else { - throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]); - } - } else { - rowToFill[dimensionSize + mesCount] = null; - } - } - return rowToFill; + /** + * get one row from memory address + * @param address address + * @return one row + */ + public IntermediateSortTempRow getRow(long address) { + return sortStepRowHandler.readIntermediateSortTempRowFromUnsafeMemory( + dataBlock.getBaseObject(), address); } - public void fillRow(long address, DataOutputStream stream) throws IOException { - int dimCount = 0; - int size = 0; - - Object baseObject = dataBlock.getBaseObject(); - for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) { - if (noDictionaryDimensionMapping[dimCount]) { - short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); - byte[] col = new byte[aShort]; - size += 2; - CarbonUnsafe.getUnsafe() - .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, - col.length); - size += col.length; - stream.writeShort(aShort); - stream.write(col); - } else { - int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); - size += 4; - stream.writeInt(anInt); - } - } - - // write complex dimensions here. - for (; dimCount < dimensionSize; dimCount++) { - short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); - byte[] col = new byte[aShort]; - size += 2; - CarbonUnsafe.getUnsafe() - .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length); - size += col.length; - stream.writeShort(aShort); - stream.write(col); - } - - int nullSetSize = nullSetWords.length * 8; - Arrays.fill(nullSetWords, 0); - CarbonUnsafe.getUnsafe() - .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, - nullSetSize); - size += nullSetSize; - for (int i = 0; i < nullSetWords.length; i++) { - stream.writeLong(nullSetWords[i]); - } - - for (int mesCount = 0; mesCount < measureSize; mesCount++) { - if (isSet(nullSetWords, mesCount)) { - DataType dataType = measureDataType[mesCount]; - if (dataType == DataTypes.SHORT) { - short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); - size += 2; - stream.writeShort(sval); - } else if (dataType == DataTypes.INT) { - int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size); - size += 4; - stream.writeInt(ival); - } else if (dataType == DataTypes.LONG) { - long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size); - size += 8; - stream.writeLong(val); - } else if (dataType == DataTypes.DOUBLE) { - double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size); - size += 8; - stream.writeDouble(doubleVal); - } else if (DataTypes.isDecimal(dataType)) { - short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size); - byte[] bigDecimalInBytes = new byte[aShort]; - size += 2; - CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes, - CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length); - size += bigDecimalInBytes.length; - stream.writeShort(aShort); - stream.write(bigDecimalInBytes); - } else { - throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]); - } - } - } + /** + * write a row to stream + * @param address address of a row + * @param stream stream + * @throws IOException + */ + public void writeRow(long address, DataOutputStream stream) throws IOException { + sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream( + dataBlock.getBaseObject(), address, stream); } public void freeMemory() { @@ -362,27 +134,8 @@ public class UnsafeCarbonRowPage { return dataBlock; } - public static void set(long[] words, int index) { - int wordOffset = (index >> 6); - words[wordOffset] |= (1L << index); - } - - public static void unset(long[] words, int index) { - int wordOffset = (index >> 6); - words[wordOffset] &= ~(1L << index); - } - - public static boolean isSet(long[] words, int index) { - int wordOffset = (index >> 6); - return ((words[wordOffset] & (1L << index)) != 0); - } - - public boolean[] getNoDictionaryDimensionMapping() { - return noDictionaryDimensionMapping; - } - - public boolean[] getNoDictionarySortColumnMapping() { - return noDictionarySortColumnMapping; + public TableFieldStat getTableFieldStat() { + return tableFieldStat; } public void setNewDataBlock(MemoryBlock newMemoryBlock) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java index 4dd5e44..5d038d3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java @@ -20,6 +20,7 @@ package org.apache.carbondata.processing.loading.sort.unsafe; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,13 +42,14 @@ import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator; -import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms; +import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims; import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow; import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger; import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort; import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sort.sortdata.SortParameters; +import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; public class UnsafeSortDataRows { @@ -69,7 +71,8 @@ public class UnsafeSortDataRows { */ private SortParameters parameters; - + private TableFieldStat tableFieldStat; + private ThreadLocal<ByteBuffer> rowBuffer; private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger; private UnsafeCarbonRowPage rowPage; @@ -94,7 +97,13 @@ public class UnsafeSortDataRows { public UnsafeSortDataRows(SortParameters parameters, UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) { this.parameters = parameters; - + this.tableFieldStat = new TableFieldStat(parameters); + this.rowBuffer = new ThreadLocal<ByteBuffer>() { + @Override protected ByteBuffer initialValue() { + byte[] backedArray = new byte[2 * 1024 * 1024]; + return ByteBuffer.wrap(backedArray); + } + }; this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger; // observer of writing file in thread @@ -127,11 +136,7 @@ public class UnsafeSortDataRows { if (isMemoryAvailable) { UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size()); } - this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(), - parameters.getNoDictionarySortColumn(), - parameters.getDimColCount() + parameters.getComplexDimColCount(), - parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock, - !isMemoryAvailable, taskId); + this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId); // Delete if any older file exists in sort temp folder deleteSortLocationIfExists(); @@ -178,7 +183,7 @@ public class UnsafeSortDataRows { private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { for (int i = 0; i < size; i++) { if (rowPage.canAdd()) { - bytesAdded += rowPage.addRow(rowBatch[i]); + bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get()); } else { try { if (enableInMemoryIntermediateMerge) { @@ -194,15 +199,8 @@ public class UnsafeSortDataRows { if (!saveToDisk) { UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size()); } - rowPage = new UnsafeCarbonRowPage( - parameters.getNoDictionaryDimnesionColumn(), - parameters.getNoDictionarySortColumn(), - parameters.getDimColCount() + parameters.getComplexDimColCount(), - parameters.getMeasureColCount(), - parameters.getMeasureDataType(), - memoryBlock, - saveToDisk, taskId); - bytesAdded += rowPage.addRow(rowBatch[i]); + rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId); + bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get()); } catch (Exception e) { LOGGER.error( "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); @@ -220,7 +218,7 @@ public class UnsafeSortDataRows { // if record holder list size is equal to sort buffer size then it will // sort the list and then write current list data to file if (rowPage.canAdd()) { - rowPage.addRow(row); + rowPage.addRow(row, rowBuffer.get()); } else { try { if (enableInMemoryIntermediateMerge) { @@ -235,13 +233,8 @@ public class UnsafeSortDataRows { if (!saveToDisk) { UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size()); } - rowPage = new UnsafeCarbonRowPage( - parameters.getNoDictionaryDimnesionColumn(), - parameters.getNoDictionarySortColumn(), - parameters.getDimColCount(), parameters.getMeasureColCount(), - parameters.getMeasureDataType(), memoryBlock, - saveToDisk, taskId); - rowPage.addRow(row); + rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId); + rowPage.addRow(row, rowBuffer.get()); } catch (Exception e) { LOGGER.error( "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); @@ -269,7 +262,7 @@ public class UnsafeSortDataRows { new UnsafeRowComparator(rowPage)); } else { timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(), - new UnsafeRowComparatorForNormalDIms(rowPage)); + new UnsafeRowComparatorForNormalDims(rowPage)); } unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage); } else { @@ -295,10 +288,9 @@ public class UnsafeSortDataRows { // write number of entries to the file stream.writeInt(actualSize); for (int i = 0; i < actualSize; i++) { - rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(), - stream); + rowPage.writeRow( + rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(), stream); } - } catch (IOException e) { throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e); } finally { @@ -367,7 +359,7 @@ public class UnsafeSortDataRows { new UnsafeRowComparator(page)); } else { timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(), - new UnsafeRowComparatorForNormalDIms(page)); + new UnsafeRowComparatorForNormalDims(page)); } if (page.isSaveToDisk()) { // create a new file every time @@ -380,7 +372,8 @@ public class UnsafeSortDataRows { writeDataToFile(page, sortTempFile); LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize() + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:" - + sortTempFile); + + sortTempFile + ", sort temp file size in MB is " + + sortTempFile.length() * 0.1 * 10 / 1024 / 1024); page.freeMemory(); // add sort temp filename to and arrayList. When the list size reaches 20 then // intermediate merging of sort temp files will be triggered http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java index d02be9b..33342dc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java @@ -23,63 +23,25 @@ import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow; +import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> { - - /** - * mapping of dictionary and no dictionary of sort_columns. - */ - private boolean[] noDictionarySortColumnMaping; - private Object baseObject; + private TableFieldStat tableFieldStat; + private int dictSizeInMemory; public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) { - this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping(); this.baseObject = rowPage.getDataBlock().getBaseObject(); + this.tableFieldStat = rowPage.getTableFieldStat(); + this.dictSizeInMemory = (tableFieldStat.getDictSortDimCnt() + + tableFieldStat.getDictNoSortDimCnt()) * 4; } /** * Below method will be used to compare two mdkey */ public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) { - int diff = 0; - long rowA = rowL.address; - long rowB = rowR.address; - int sizeA = 0; - int sizeB = 0; - for (boolean isNoDictionary : noDictionarySortColumnMaping) { - if (isNoDictionary) { - short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA); - byte[] byteArr1 = new byte[aShort1]; - sizeA += 2; - CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1, - CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1); - sizeA += aShort1; - - short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB); - byte[] byteArr2 = new byte[aShort2]; - sizeB += 2; - CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2, - CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2); - sizeB += aShort2; - - int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); - if (difference != 0) { - return difference; - } - } else { - int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA); - sizeA += 4; - int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB); - sizeB += 4; - diff = dimFieldA - dimFieldB; - if (diff != 0) { - return diff; - } - } - } - - return diff; + return compare(rowL, baseObject, rowR, baseObject); } /** @@ -90,35 +52,40 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> { int diff = 0; long rowA = rowL.address; long rowB = rowR.address; - int sizeA = 0; - int sizeB = 0; - for (boolean isNoDictionary : noDictionarySortColumnMaping) { + int sizeInDictPartA = 0; + + int sizeInNonDictPartA = 0; + int sizeInDictPartB = 0; + int sizeInNonDictPartB = 0; + for (boolean isNoDictionary : tableFieldStat.getIsSortColNoDictFlags()) { if (isNoDictionary) { - short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA); - byte[] byteArr1 = new byte[aShort1]; - sizeA += 2; + short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL, + rowA + dictSizeInMemory + sizeInNonDictPartA); + byte[] byteArr1 = new byte[lengthA]; + sizeInNonDictPartA += 2; CarbonUnsafe.getUnsafe() - .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, - aShort1); - sizeA += aShort1; + .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA, + byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA); + sizeInNonDictPartA += lengthA; - short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB); - byte[] byteArr2 = new byte[aShort2]; - sizeB += 2; + short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR, + rowB + dictSizeInMemory + sizeInNonDictPartB); + byte[] byteArr2 = new byte[lengthB]; + sizeInNonDictPartB += 2; CarbonUnsafe.getUnsafe() - .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, - aShort2); - sizeB += aShort2; + .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB, + byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB); + sizeInNonDictPartB += lengthB; int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); if (difference != 0) { return difference; } } else { - int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA); - sizeA += 4; - int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB); - sizeB += 4; + int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA); + sizeInDictPartA += 4; + int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeInDictPartB); + sizeInDictPartB += 4; diff = dimFieldA - dimFieldB; if (diff != 0) { return diff; http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java deleted file mode 100644 index 483dcb2..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.loading.sort.unsafe.comparator; - -import java.util.Comparator; - -import org.apache.carbondata.core.memory.CarbonUnsafe; -import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; -import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow; - -public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> { - - private Object baseObject; - - private int numberOfSortColumns; - - public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) { - this.baseObject = rowPage.getDataBlock().getBaseObject(); - this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length; - } - - /** - * Below method will be used to compare two mdkey - */ - public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) { - int diff = 0; - long rowA = rowL.address; - long rowB = rowR.address; - int sizeA = 0; - int sizeB = 0; - for (int i = 0; i < numberOfSortColumns; i++) { - int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA); - sizeA += 4; - int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB); - sizeB += 4; - diff = dimFieldA - dimFieldB; - if (diff != 0) { - return diff; - } - } - - return diff; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java new file mode 100644 index 0000000..e9cfb1c --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.sort.unsafe.comparator; + +import java.util.Comparator; + +import org.apache.carbondata.core.memory.CarbonUnsafe; +import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow; + +public class UnsafeRowComparatorForNormalDims implements Comparator<UnsafeCarbonRow> { + + private Object baseObject; + + private int numberOfSortColumns; + + public UnsafeRowComparatorForNormalDims(UnsafeCarbonRowPage rowPage) { + this.baseObject = rowPage.getDataBlock().getBaseObject(); + this.numberOfSortColumns = rowPage.getTableFieldStat().getIsSortColNoDictFlags().length; + } + + /** + * Below method will be used to compare two mdkey + */ + public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) { + int diff = 0; + long rowA = rowL.address; + long rowB = rowR.address; + int sizeA = 0; + int sizeB = 0; + for (int i = 0; i < numberOfSortColumns; i++) { + int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA); + sizeA += 4; + int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB); + sizeB += 4; + diff = dimFieldA - dimFieldB; + if (diff != 0) { + return diff; + } + } + + return diff; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java index 686e855..d790c41 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java @@ -17,6 +17,7 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; /** @@ -28,7 +29,7 @@ public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> { void readRow() throws CarbonSortKeyAndGroupByException; - Object[] getRow(); + IntermediateSortTempRow getRow(); int numberOfRows(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java index 6b0cfa6..a776db1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java @@ -19,9 +19,10 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger; -import org.apache.carbondata.processing.sort.sortdata.NewRowComparator; +import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator; public class UnsafeFinalMergePageHolder implements SortTempChunkHolder { @@ -38,21 +39,18 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder { private UnsafeCarbonRowPage[] rowPages; - private NewRowComparator comparator; + private IntermediateSortTempRowComparator comparator; - private Object[] currentRow; - - private int columnSize; + private IntermediateSortTempRow currentRow; public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger, - boolean[] noDictSortColumnMapping, int columnSize) { + boolean[] noDictSortColumnMapping) { this.actualSize = merger.getEntryCount(); this.mergedAddresses = merger.getMergedAddresses(); this.rowPageIndexes = merger.getRowPageIndexes(); this.rowPages = merger.getUnsafeCarbonRowPages(); LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize); - this.comparator = new NewRowComparator(noDictSortColumnMapping); - this.columnSize = columnSize; + this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping); } public boolean hasNext() { @@ -63,12 +61,11 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder { } public void readRow() { - currentRow = new Object[columnSize]; - rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow); + currentRow = rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter]); counter++; } - public Object[] getRow() { + public IntermediateSortTempRow getRow() { return currentRow; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2b41f140/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java index 6f05088..cbcbbae 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java @@ -19,8 +19,9 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; -import org.apache.carbondata.processing.sort.sortdata.NewRowComparator; +import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator; public class UnsafeInmemoryHolder implements SortTempChunkHolder { @@ -33,21 +34,18 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder { private UnsafeCarbonRowPage rowPage; - private Object[] currentRow; + private IntermediateSortTempRow currentRow; private long address; - private NewRowComparator comparator; + private IntermediateSortTempRowComparator comparator; - private int columnSize; - - public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize, - int numberOfSortColumns) { + public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage) { this.actualSize = rowPage.getBuffer().getActualSize(); this.rowPage = rowPage; LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize); - this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping()); - this.columnSize = columnSize; + this.comparator = new IntermediateSortTempRowComparator( + rowPage.getTableFieldStat().getIsSortColNoDictFlags()); } public boolean hasNext() { @@ -58,13 +56,12 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder { } public void readRow() { - currentRow = new Object[columnSize]; address = rowPage.getBuffer().get(counter); - rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow); + currentRow = rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset()); counter++; } - public Object[] getRow() { + public IntermediateSortTempRow getRow() { return currentRow; }