http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java index 527452a..11b3d43 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -31,14 +31,15 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; -import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; -import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator; +import org.apache.carbondata.processing.sort.sortdata.NewRowComparator; import org.apache.carbondata.processing.sort.sortdata.SortParameters; -import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { @@ -62,15 +63,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { * entry count */ private int entryCount; + /** * return row */ - private IntermediateSortTempRow returnRow; + private Object[] returnRow; + private int dimCnt; + private int complexCnt; + private int measureCnt; + private boolean[] isNoDictionaryDimensionColumn; + private DataType[] measureDataTypes; private int readBufferSize; private String compressorName; - private IntermediateSortTempRow[] currentBuffer; + private Object[][] currentBuffer; - private IntermediateSortTempRow[] backupBuffer; + private Object[][] backupBuffer; private boolean isBackupFilled; @@ -93,21 +100,27 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { private int numberOfObjectRead; - private TableFieldStat tableFieldStat; - private SortStepRowHandler sortStepRowHandler; - private Comparator<IntermediateSortTempRow> comparator; + private int nullSetWordsLength; + + private Comparator<Object[]> comparator; + /** * Constructor to initialize */ public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) { // set temp file this.tempFile = tempFile; + this.dimCnt = parameters.getDimColCount(); + this.complexCnt = parameters.getComplexDimColCount(); + this.measureCnt = parameters.getMeasureColCount(); + this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); + this.measureDataTypes = parameters.getMeasureDataType(); this.readBufferSize = parameters.getBufferSize(); this.compressorName = parameters.getSortTempCompressorName(); - this.tableFieldStat = new TableFieldStat(parameters); - this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); + this.executorService = Executors.newFixedThreadPool(1); - comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn()); + this.nullSetWordsLength = ((parameters.getMeasureColCount() - 1) >> 6) + 1; + comparator = new NewRowComparator(parameters.getNoDictionarySortColumn()); initialize(); } @@ -156,17 +169,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { * * @throws CarbonSortKeyAndGroupByException problem while reading */ - @Override public void readRow() throws CarbonSortKeyAndGroupByException { if (prefetch) { fillDataForPrefetch(); } else { - try { - this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream); - this.numberOfObjectRead++; - } catch (IOException e) { - throw new CarbonSortKeyAndGroupByException("Problems while reading row", e); - } + this.returnRow = getRowFromStream(); } } @@ -200,22 +207,63 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { } /** - * get a batch of row, this interface is used in reading compressed sort temp files - * - * @param expected expected number in a batch - * @return a batch of row - * @throws IOException if error occurs while reading from stream + * @return + * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) - throws IOException { - IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected]; - for (int i = 0; i < expected; i++) { - IntermediateSortTempRow holder - = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream); - holders[i] = holder; + private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException { + Object[] row = new Object[dimCnt + measureCnt]; + try { + int dimCount = 0; + for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { + if (isNoDictionaryDimensionColumn[dimCount]) { + short aShort = stream.readShort(); + byte[] col = new byte[aShort]; + stream.readFully(col); + row[dimCount] = col; + } else { + int anInt = stream.readInt(); + row[dimCount] = anInt; + } + } + + // write complex dimensions here. + for (; dimCount < dimCnt; dimCount++) { + short aShort = stream.readShort(); + byte[] col = new byte[aShort]; + stream.readFully(col); + row[dimCount] = col; + } + + long[] words = new long[nullSetWordsLength]; + for (int i = 0; i < words.length; i++) { + words[i] = stream.readLong(); + } + + for (int mesCount = 0; mesCount < measureCnt; mesCount++) { + if (UnsafeCarbonRowPage.isSet(words, mesCount)) { + DataType dataType = measureDataTypes[mesCount]; + if (dataType == DataTypes.SHORT) { + row[dimCount + mesCount] = stream.readShort(); + } else if (dataType == DataTypes.INT) { + row[dimCount + mesCount] = stream.readInt(); + } else if (dataType == DataTypes.LONG) { + row[dimCount + mesCount] = stream.readLong(); + } else if (dataType == DataTypes.DOUBLE) { + row[dimCount + mesCount] = stream.readDouble(); + } else if (DataTypes.isDecimal(dataType)) { + short aShort = stream.readShort(); + byte[] bigDecimalInBytes = new byte[aShort]; + stream.readFully(bigDecimalInBytes); + row[dimCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); + } else { + throw new IllegalArgumentException("unsupported data type:" + dataType); + } + } + } + return row; + } catch (IOException e) { + throw new CarbonSortKeyAndGroupByException(e); } - this.numberOfObjectRead += expected; - return holders; } /** @@ -223,7 +271,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { * * @return row */ - public IntermediateSortTempRow getRow() { + public Object[] getRow() { return this.returnRow; } @@ -278,7 +326,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { @Override public int hashCode() { int hash = 0; - hash += tableFieldStat.hashCode(); + hash += 31 * measureCnt; + hash += 31 * dimCnt; + hash += 31 * complexCnt; hash += tempFile.hashCode(); return hash; } @@ -318,12 +368,16 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { /** * This method will read the records from sort temp file and keep it in a buffer * - * @param numberOfRecords number of records to be read - * @return batch of intermediate sort temp row - * @throws IOException if error occurs reading records from file + * @param numberOfRecords + * @return + * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords) - throws IOException { - return readBatchedRowFromStream(numberOfRecords); + private Object[][] prefetchRecordsFromFile(int numberOfRecords) + throws CarbonSortKeyAndGroupByException { + Object[][] records = new Object[numberOfRecords][]; + for (int i = 0; i < numberOfRecords; i++) { + records[i] = getRowFromStream(); + } + return records; } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java index 22673ff..4bbf61b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java @@ -21,21 +21,25 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.AbstractQueue; +import java.util.Arrays; import java.util.PriorityQueue; import java.util.concurrent.Callable; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; -import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder; import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sort.sortdata.SortParameters; -import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; public class UnsafeIntermediateFileMerger implements Callable<Void> { /** @@ -65,13 +69,22 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { private int totalNumberOfRecords; private SortParameters mergerParameters; - private TableFieldStat tableFieldStat; + private File[] intermediateFiles; + private File outPutFile; + private int dimCnt; + private int complexCnt; + private int measureCnt; + private boolean[] isNoDictionaryDimensionColumn; + private DataType[] measureDataTypes; private int writeBufferSize; private String compressorName; - private SortStepRowHandler sortStepRowHandler; + + private long[] nullSetWords; + + private ByteBuffer rowData; private Throwable throwable; @@ -84,10 +97,16 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { this.fileCounter = intermediateFiles.length; this.intermediateFiles = intermediateFiles; this.outPutFile = outPutFile; + this.dimCnt = mergerParameters.getDimColCount(); + this.complexCnt = mergerParameters.getComplexDimColCount(); + this.measureCnt = mergerParameters.getMeasureColCount(); + this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn(); + this.measureDataTypes = mergerParameters.getMeasureDataType(); this.writeBufferSize = mergerParameters.getBufferSize(); this.compressorName = mergerParameters.getSortTempCompressorName(); - this.tableFieldStat = new TableFieldStat(mergerParameters); - this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); + this.nullSetWords = new long[((measureCnt - 1) >> 6) + 1]; + // Take size of 2 MB for each row. I think it is high enough to use + rowData = ByteBuffer.allocate(2 * 1024 * 1024); } @Override public Void call() throws Exception { @@ -146,14 +165,13 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { } /** - * This method will be used to get sorted sort temp row from the sort temp files + * This method will be used to get the sorted record from file * * @return sorted record sorted record * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow getSortedRecordFromFile() - throws CarbonSortKeyAndGroupByException { - IntermediateSortTempRow row = null; + private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException { + Object[] row = null; // poll the top object from heap // heap maintains binary tree which is based on heap condition that will @@ -217,7 +235,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { this.recordHolderHeap.add(sortTempFileChunkHolder); } - LOGGER.info("Heap Size: " + this.recordHolderHeap.size()); + LOGGER.info("Heap Size" + this.recordHolderHeap.size()); } /** @@ -232,12 +250,12 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { } /** - * This method will be used to get the sorted sort temp row + * This method will be used to get the sorted row * * @return sorted row * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException { + private Object[] next() throws CarbonSortKeyAndGroupByException { return getSortedRecordFromFile(); } @@ -254,16 +272,82 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { /** * Below method will be used to write data to file * - * @throws IOException problem while writing + * @throws CarbonSortKeyAndGroupByException problem while writing */ - private void writeDataToFile(IntermediateSortTempRow row) throws IOException { - sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream); + private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException { + int dimCount = 0; + int size = 0; + for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { + if (isNoDictionaryDimensionColumn[dimCount]) { + byte[] col = (byte[]) row[dimCount]; + rowData.putShort((short) col.length); + size += 2; + rowData.put(col); + size += col.length; + } else { + rowData.putInt((int) row[dimCount]); + size += 4; + } + } + + // write complex dimensions here. + int dimensionSize = dimCnt + complexCnt; + for (; dimCount < dimensionSize; dimCount++) { + byte[] col = (byte[]) row[dimCount]; + rowData.putShort((short)col.length); + size += 2; + rowData.put(col); + size += col.length; + } + Arrays.fill(nullSetWords, 0); + int nullSetSize = nullSetWords.length * 8; + int nullLoc = size; + size += nullSetSize; + for (int mesCount = 0; mesCount < measureCnt; mesCount++) { + Object value = row[mesCount + dimensionSize]; + if (null != value) { + DataType dataType = measureDataTypes[mesCount]; + if (dataType == DataTypes.SHORT) { + rowData.putShort(size, (Short) value); + size += 2; + } else if (dataType == DataTypes.INT) { + rowData.putInt(size, (Integer) value); + size += 4; + } else if (dataType == DataTypes.LONG) { + rowData.putLong(size, (Long) value); + size += 8; + } else if (dataType == DataTypes.DOUBLE) { + rowData.putDouble(size, (Double) value); + size += 8; + } else if (DataTypes.isDecimal(dataType)) { + byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(((BigDecimal) value)); + rowData.putShort(size, (short) bigDecimalInBytes.length); + size += 2; + for (int i = 0; i < bigDecimalInBytes.length; i++) { + rowData.put(size++, bigDecimalInBytes[i]); + } + } + UnsafeCarbonRowPage.set(nullSetWords, mesCount); + } else { + UnsafeCarbonRowPage.unset(nullSetWords, mesCount); + } + } + for (int i = 0; i < nullSetWords.length; i++) { + rowData.putLong(nullLoc, nullSetWords[i]); + nullLoc += 8; + } + byte[] rowBytes = new byte[size]; + rowData.position(0); + rowData.get(rowBytes); + stream.write(rowBytes); + rowData.clear(); } private void finish() throws CarbonSortKeyAndGroupByException { clear(); try { CarbonUtil.deleteFiles(intermediateFiles); + rowData.clear(); } catch (IOException e) { throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index 64f3c25..ce118d9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -29,8 +29,7 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; -import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.processing.loading.sort.SortStepRowUtil; import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder; import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeFinalMergePageHolder; @@ -56,7 +55,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal; private SortParameters parameters; - private SortStepRowHandler sortStepRowHandler; + private SortStepRowUtil sortStepRowUtil; /** * tempFileLocation */ @@ -69,7 +68,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters, String[] tempFileLocation) { this.parameters = parameters; - this.sortStepRowHandler = new SortStepRowHandler(parameters); + this.sortStepRowUtil = new SortStepRowUtil(parameters); this.tempFileLocation = tempFileLocation; this.tableName = parameters.getTableName(); } @@ -109,7 +108,9 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec LOGGER.info("Started adding first record from each page"); for (final UnsafeCarbonRowPage rowPage : rowPages) { - SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage); + SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage, + parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters + .getMeasureColCount(), parameters.getNumberOfSortColumns()); // initialize sortTempFileChunkHolder.readRow(); @@ -120,7 +121,9 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec for (final UnsafeInMemoryIntermediateDataMerger merger : merges) { SortTempChunkHolder sortTempFileChunkHolder = - new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn()); + new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(), + parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters + .getMeasureColCount()); // initialize sortTempFileChunkHolder.readRow(); @@ -139,7 +142,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec recordHolderHeapLocal.add(sortTempFileChunkHolder); } - LOGGER.info("Heap Size: " + this.recordHolderHeapLocal.size()); + LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size()); } catch (Exception e) { LOGGER.error(e); throw new CarbonDataWriterException(e); @@ -177,14 +180,12 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec } /** - * This method will be used to get the sorted row in 3-parted format. - * The row will feed the following writer process step. + * This method will be used to get the sorted row * * @return sorted row */ public Object[] next() { - IntermediateSortTempRow sortTempRow = getSortedRecordFromFile(); - return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow); + return sortStepRowUtil.convertRow(getSortedRecordFromFile()); } /** @@ -192,8 +193,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec * * @return sorted record sorted record */ - private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException { - IntermediateSortTempRow row = null; + private Object[] getSortedRecordFromFile() throws CarbonDataWriterException { + Object[] row = null; // poll the top object from heap // heap maintains binary tree which is based on heap condition that will http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index ea11e22..751903a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -389,6 +389,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length); } sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping); + String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation, CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); finalMerger = http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java index c06819c..04efa1f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java @@ -21,6 +21,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.math.BigDecimal; import java.util.AbstractQueue; import java.util.PriorityQueue; import java.util.concurrent.Callable; @@ -28,9 +29,11 @@ import java.util.concurrent.Callable; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; -import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; public class IntermediateFileMerger implements Callable<Void> { @@ -65,12 +68,17 @@ public class IntermediateFileMerger implements Callable<Void> { private File[] intermediateFiles; private File outPutFile; + private int dimCnt; + private int noDictDimCnt; + private int complexCnt; + private int measureCnt; + private boolean[] isNoDictionaryDimensionColumn; + private DataType[] measureDataTypes; private int writeBufferSize; private String compressorName; private Throwable throwable; - private TableFieldStat tableFieldStat; - private SortStepRowHandler sortStepRowHandler; + /** * IntermediateFileMerger Constructor */ @@ -80,10 +88,14 @@ public class IntermediateFileMerger implements Callable<Void> { this.fileCounter = intermediateFiles.length; this.intermediateFiles = intermediateFiles; this.outPutFile = outPutFile; + this.dimCnt = mergerParameters.getDimColCount(); + this.noDictDimCnt = mergerParameters.getNoDictionaryCount(); + this.complexCnt = mergerParameters.getComplexDimColCount(); + this.measureCnt = mergerParameters.getMeasureColCount(); + this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn(); + this.measureDataTypes = mergerParameters.getMeasureDataType(); this.writeBufferSize = mergerParameters.getBufferSize(); this.compressorName = mergerParameters.getSortTempCompressorName(); - this.tableFieldStat = new TableFieldStat(mergerParameters); - this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); } @Override public Void call() throws Exception { @@ -142,14 +154,13 @@ public class IntermediateFileMerger implements Callable<Void> { } /** - * This method will be used to get the sorted sort temp row from sort temp file + * This method will be used to get the sorted record from file * * @return sorted record sorted record * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow getSortedRecordFromFile() - throws CarbonSortKeyAndGroupByException { - IntermediateSortTempRow row = null; + private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException { + Object[] row = null; // poll the top object from heap // heap maintains binary tree which is based on heap condition that will @@ -216,7 +227,7 @@ public class IntermediateFileMerger implements Callable<Void> { this.recordHolderHeap.add(sortTempFileChunkHolder); } - LOGGER.info("Heap Size: " + this.recordHolderHeap.size()); + LOGGER.info("Heap Size" + this.recordHolderHeap.size()); } /** @@ -231,12 +242,12 @@ public class IntermediateFileMerger implements Callable<Void> { } /** - * This method will be used to get the sorted sort temp row + * This method will be used to get the sorted row * * @return sorted row * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException { + private Object[] next() throws CarbonSortKeyAndGroupByException { return getSortedRecordFromFile(); } @@ -253,10 +264,62 @@ public class IntermediateFileMerger implements Callable<Void> { /** * Below method will be used to write data to file * - * @throws IOException problem while writing + * @throws CarbonSortKeyAndGroupByException problem while writing */ - private void writeDataToFile(IntermediateSortTempRow row) throws IOException { - sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream); + private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException { + try { + int[] mdkArray = (int[]) row[0]; + byte[][] nonDictArray = (byte[][]) row[1]; + int mdkIndex = 0; + int nonDictKeyIndex = 0; + // write dictionary and non dictionary dimensions here. + for (boolean nodictinary : isNoDictionaryDimensionColumn) { + if (nodictinary) { + byte[] col = nonDictArray[nonDictKeyIndex++]; + stream.writeShort(col.length); + stream.write(col); + } else { + stream.writeInt(mdkArray[mdkIndex++]); + } + } + // write complex + for (; nonDictKeyIndex < noDictDimCnt + complexCnt; nonDictKeyIndex++) { + byte[] col = nonDictArray[nonDictKeyIndex++]; + stream.writeShort(col.length); + stream.write(col); + } + // write measure + int fieldIndex = 0; + for (int counter = 0; counter < measureCnt; counter++) { + if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) { + stream.write((byte) 1); + DataType dataType = measureDataTypes[counter]; + if (dataType == DataTypes.BOOLEAN) { + stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row)); + } else if (dataType == DataTypes.SHORT) { + stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row)); + } else if (dataType == DataTypes.INT) { + stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row)); + } else if (dataType == DataTypes.LONG) { + stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row)); + } else if (dataType == DataTypes.DOUBLE) { + stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row)); + } else if (DataTypes.isDecimal(dataType)) { + byte[] bigDecimalInBytes = DataTypeUtil + .bigDecimalToByte((BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row)); + stream.writeInt(bigDecimalInBytes.length); + stream.write(bigDecimalInBytes); + } else { + throw new IllegalArgumentException("unsupported data type:" + dataType); + } + } else { + stream.write((byte) 0); + } + fieldIndex++; + } + } catch (IOException e) { + throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e); + } } private void finish() throws CarbonSortKeyAndGroupByException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java deleted file mode 100644 index 9b6d1e8..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.sort.sortdata; - -import java.util.Comparator; - -import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; -import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; - -/** - * This class is used as comparator for comparing intermediate sort temp row - */ -public class IntermediateSortTempRowComparator implements Comparator<IntermediateSortTempRow> { - /** - * isSortColumnNoDictionary whether the sort column is not dictionary or not - */ - private boolean[] isSortColumnNoDictionary; - - /** - * @param isSortColumnNoDictionary isSortColumnNoDictionary - */ - public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) { - this.isSortColumnNoDictionary = isSortColumnNoDictionary; - } - - /** - * Below method will be used to compare two sort temp row - */ - public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow rowB) { - int diff = 0; - int dictIndex = 0; - int nonDictIndex = 0; - - for (boolean isNoDictionary : isSortColumnNoDictionary) { - - if (isNoDictionary) { - byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex]; - byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex]; - nonDictIndex++; - - int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); - if (difference != 0) { - return difference; - } - } else { - int dimFieldA = rowA.getDictSortDims()[dictIndex]; - int dimFieldB = rowB.getDictSortDims()[dictIndex]; - dictIndex++; - - diff = dimFieldA - dimFieldB; - if (diff != 0) { - return diff; - } - } - } - return diff; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java index 3f94533..d2579d2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java @@ -40,11 +40,14 @@ public class NewRowComparator implements Comparator<Object[]> { */ public int compare(Object[] rowA, Object[] rowB) { int diff = 0; + int index = 0; for (boolean isNoDictionary : noDictionarySortColumnMaping) { + if (isNoDictionary) { byte[] byteArr1 = (byte[]) rowA[index]; + byte[] byteArr2 = (byte[]) rowB[index]; int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); @@ -54,7 +57,6 @@ public class NewRowComparator implements Comparator<Object[]> { } else { int dimFieldA = (int) rowA[index]; int dimFieldB = (int) rowB[index]; - diff = dimFieldA - dimFieldB; if (diff != 0) { return diff; @@ -63,6 +65,7 @@ public class NewRowComparator implements Comparator<Object[]> { index++; } + return diff; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java index 7538c92..e01b587 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java @@ -29,7 +29,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> { private int numberOfSortColumns; /** - * NewRowComparatorForNormalDims Constructor + * RowComparatorForNormalDims Constructor * * @param numberOfSortColumns */ @@ -46,6 +46,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> { int diff = 0; for (int i = 0; i < numberOfSortColumns; i++) { + int dimFieldA = (int)rowA[i]; int dimFieldB = (int)rowB[i]; diff = dimFieldA - dimFieldB; http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java new file mode 100644 index 0000000..0ae0b93 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.sort.sortdata; + +import java.nio.ByteBuffer; +import java.util.Comparator; + +import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; +import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; +import org.apache.carbondata.core.util.NonDictionaryUtil; + +public class RowComparator implements Comparator<Object[]> { + /** + * noDictionaryCount represent number of no dictionary cols + */ + private int noDictionaryCount; + + /** + * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions. + */ + private boolean[] noDictionarySortColumnMaping; + + /** + * @param noDictionarySortColumnMaping + * @param noDictionaryCount + */ + public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) { + this.noDictionaryCount = noDictionaryCount; + this.noDictionarySortColumnMaping = noDictionarySortColumnMaping; + } + + /** + * Below method will be used to compare two mdkey + */ + public int compare(Object[] rowA, Object[] rowB) { + int diff = 0; + + int normalIndex = 0; + int noDictionaryindex = 0; + + for (boolean isNoDictionary : noDictionarySortColumnMaping) { + + if (isNoDictionary) { + byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX]; + + ByteBuffer buff1 = ByteBuffer.wrap(byteArr1); + + // extract a high card dims from complete byte[]. + NonDictionaryUtil + .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1); + + byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX]; + + ByteBuffer buff2 = ByteBuffer.wrap(byteArr2); + + // extract a high card dims from complete byte[]. + NonDictionaryUtil + .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2); + + int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2); + if (difference != 0) { + return difference; + } + noDictionaryindex++; + } else { + int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA); + int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB); + diff = dimFieldA - dimFieldB; + if (diff != 0) { + return diff; + } + normalIndex++; + } + + } + + return diff; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java new file mode 100644 index 0000000..0883ae1 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.sort.sortdata; + +import java.util.Comparator; + +import org.apache.carbondata.core.util.NonDictionaryUtil; + +/** + * This class is used as comparator for comparing dims which are non high cardinality dims. + * Here the dims will be in form of int[] (surrogates) so directly comparing the integers. + */ +public class RowComparatorForNormalDims implements Comparator<Object[]> { + /** + * dimension count + */ + private int numberOfSortColumns; + + /** + * RowComparatorForNormalDims Constructor + * + * @param numberOfSortColumns + */ + public RowComparatorForNormalDims(int numberOfSortColumns) { + this.numberOfSortColumns = numberOfSortColumns; + } + + /** + * Below method will be used to compare two surrogate keys + * + * @see Comparator#compare(Object, Object) + */ + public int compare(Object[] rowA, Object[] rowB) { + int diff = 0; + + for (int i = 0; i < numberOfSortColumns; i++) { + + int dimFieldA = NonDictionaryUtil.getDimension(i, rowA); + int dimFieldB = NonDictionaryUtil.getDimension(i, rowB); + + diff = dimFieldA - dimFieldB; + if (diff != 0) { + return diff; + } + } + return diff; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java index a4ac0ea..88695b9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java @@ -37,8 +37,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; -import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -73,12 +71,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { * tableName */ private String tableName; - private SortParameters sortParameters; - private SortStepRowHandler sortStepRowHandler; + /** * tempFileLocation */ private String[] tempFileLocation; + private SortParameters sortParameters; private int maxThreadForSorting; @@ -91,7 +89,6 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { this.tempFileLocation = tempFileLocation; this.tableName = tableName; this.sortParameters = sortParameters; - this.sortStepRowHandler = new SortStepRowHandler(sortParameters); try { maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD, @@ -110,7 +107,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { */ public void startFinalMerge() throws CarbonDataWriterException { List<File> filesToMerge = getFilesToMergeSort(); - if (filesToMerge.size() == 0) { + if (filesToMerge.size() == 0) + { LOGGER.info("No file to merge in final merge stage"); return; } @@ -127,9 +125,11 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { // get all the merged files List<File> files = new ArrayList<File>(tempFileLocation.length); - for (String tempLoc : tempFileLocation) { + for (String tempLoc : tempFileLocation) + { File[] subFiles = new File(tempLoc).listFiles(fileFilter); - if (null != subFiles && subFiles.length > 0) { + if (null != subFiles && subFiles.length > 0) + { files.addAll(Arrays.asList(subFiles)); } } @@ -226,14 +226,13 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { } /** - * This method will be used to get the sorted sort temp row from the sort temp files + * This method will be used to get the sorted row * * @return sorted row * @throws CarbonSortKeyAndGroupByException */ public Object[] next() { - IntermediateSortTempRow sortTempRow = getSortedRecordFromFile(); - return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow); + return getSortedRecordFromFile(); } /** @@ -242,8 +241,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { * @return sorted record sorted record * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException { - IntermediateSortTempRow row = null; + private Object[] getSortedRecordFromFile() throws CarbonDataWriterException { + Object[] row = null; // poll the top object from heap // heap maintains binary tree which is based on heap condition that will http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java index c7efbd9..57a19bd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java @@ -20,7 +20,7 @@ package org.apache.carbondata.processing.sort.sortdata; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; +import java.math.BigDecimal; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -32,10 +32,12 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -67,8 +69,7 @@ public class SortDataRows { private Semaphore semaphore; private SortParameters parameters; - private SortStepRowHandler sortStepRowHandler; - private ThreadLocal<ByteBuffer> rowBuffer; + private int sortBufferSize; private SortIntermediateFileMerger intermediateFileMerger; @@ -78,7 +79,7 @@ public class SortDataRows { public SortDataRows(SortParameters parameters, SortIntermediateFileMerger intermediateFileMerger) { this.parameters = parameters; - this.sortStepRowHandler = new SortStepRowHandler(parameters); + this.intermediateFileMerger = intermediateFileMerger; int batchSize = CarbonProperties.getInstance().getBatchSize(); @@ -86,12 +87,6 @@ public class SortDataRows { this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize); // observer of writing file in thread this.threadStatusObserver = new ThreadStatusObserver(); - this.rowBuffer = new ThreadLocal<ByteBuffer>() { - @Override protected ByteBuffer initialValue() { - byte[] backedArray = new byte[2 * 1024 * 1024]; - return ByteBuffer.wrap(backedArray); - } - }; } /** @@ -135,7 +130,8 @@ public class SortDataRows { semaphore.acquire(); dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); } catch (InterruptedException e) { - LOGGER.error(e, "exception occurred while trying to acquire a semaphore lock: "); + LOGGER.error(e, + "exception occurred while trying to acquire a semaphore lock: "); throw new CarbonSortKeyAndGroupByException(e); } // create the new holder Array @@ -162,7 +158,7 @@ public class SortDataRows { } intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount; + sizeLeft = sortBufferSize - entryCount ; if (sizeLeft > 0) { System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); } @@ -216,6 +212,7 @@ public class SortDataRows { locationChosen + File.separator + parameters.getTableName() + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataToFile(recordHolderList, this.entryCount, file); + } startFileBasedMerge(); @@ -223,7 +220,7 @@ public class SortDataRows { } /** - * Below method will be used to write data to sort temp file + * Below method will be used to write data to file * * @throws CarbonSortKeyAndGroupByException problem while writing */ @@ -236,9 +233,60 @@ public class SortDataRows { parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName()); // write number of entries to the file stream.writeInt(entryCountLocal); + int complexDimColCount = parameters.getComplexDimColCount(); + int dimColCount = parameters.getDimColCount() + complexDimColCount; + DataType[] type = parameters.getMeasureDataType(); + boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn(); + Object[] row = null; for (int i = 0; i < entryCountLocal; i++) { - sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToOutputStream( - recordHolderList[i], stream, rowBuffer.get()); + // get row from record holder list + row = recordHolderList[i]; + int dimCount = 0; + // write dictionary and non dictionary dimensions here. + for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) { + if (noDictionaryDimnesionMapping[dimCount]) { + byte[] col = (byte[]) row[dimCount]; + stream.writeShort(col.length); + stream.write(col); + } else { + stream.writeInt((int)row[dimCount]); + } + } + // write complex dimensions here. + for (; dimCount < dimColCount; dimCount++) { + byte[] value = (byte[])row[dimCount]; + stream.writeShort(value.length); + stream.write(value); + } + // as measures are stored in separate array. + for (int mesCount = 0; + mesCount < parameters.getMeasureColCount(); mesCount++) { + Object value = row[mesCount + dimColCount]; + if (null != value) { + stream.write((byte) 1); + DataType dataType = type[mesCount]; + if (dataType == DataTypes.BOOLEAN) { + stream.writeBoolean((boolean) value); + } else if (dataType == DataTypes.SHORT) { + stream.writeShort((Short) value); + } else if (dataType == DataTypes.INT) { + stream.writeInt((Integer) value); + } else if (dataType == DataTypes.LONG) { + stream.writeLong((Long) value); + } else if (dataType == DataTypes.DOUBLE) { + stream.writeDouble((Double) value); + } else if (DataTypes.isDecimal(dataType)) { + BigDecimal val = (BigDecimal) value; + byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val); + stream.writeInt(bigDecimalInBytes.length); + stream.write(bigDecimalInBytes); + } else { + throw new IllegalArgumentException("unsupported data type:" + type[mesCount]); + } + } else { + stream.write((byte) 0); + } + } } } catch (IOException e) { throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e); @@ -253,7 +301,7 @@ public class SortDataRows { * * @throws CarbonSortKeyAndGroupByException */ - private void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException { + public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException { CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation()); } @@ -332,8 +380,7 @@ public class SortDataRows { // intermediate merging of sort temp files will be triggered intermediateFileMerger.addFileToMerge(sortTempFile); LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + ( - System.currentTimeMillis() - startTime) + ", sort temp file size in MB is " - + sortTempFile.length() * 0.1 * 10 / 1024 / 1024); + System.currentTimeMillis() - startTime)); } catch (Throwable e) { try { threadStatusObserver.notifyFailed(e); http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java index 7e221a7..d726539 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java @@ -21,7 +21,6 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.Comparator; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,11 +30,14 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; -import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> { @@ -69,13 +71,20 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold /** * return row */ - private IntermediateSortTempRow returnRow; + private Object[] returnRow; + private int dimCnt; + private int noDictDimCnt; + private int complexCnt; + private int measureCnt; + private boolean[] isNoDictionaryDimensionColumn; + private boolean[] isNoDictionarySortColumn; + private DataType[] measureDataTypes; private int readBufferSize; private String compressorName; - private IntermediateSortTempRow[] currentBuffer; + private Object[][] currentBuffer; - private IntermediateSortTempRow[] backupBuffer; + private Object[][] backupBuffer; private boolean isBackupFilled; @@ -95,9 +104,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold * totalRecordFetch */ private int totalRecordFetch; - private TableFieldStat tableFieldStat; - private SortStepRowHandler sortStepRowHandler; - private Comparator<IntermediateSortTempRow> comparator; + /** * Constructor to initialize * @@ -108,12 +115,16 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName) { // set temp file this.tempFile = tempFile; + this.dimCnt = sortParameters.getDimColCount(); + this.noDictDimCnt = sortParameters.getNoDictionaryCount(); + this.complexCnt = sortParameters.getComplexDimColCount(); + this.measureCnt = sortParameters.getMeasureColCount(); + this.isNoDictionaryDimensionColumn = sortParameters.getNoDictionaryDimnesionColumn(); + this.isNoDictionarySortColumn = sortParameters.getNoDictionarySortColumn(); + this.measureDataTypes = sortParameters.getMeasureDataType(); this.readBufferSize = sortParameters.getBufferSize(); this.compressorName = sortParameters.getSortTempCompressorName(); - this.tableFieldStat = new TableFieldStat(sortParameters); - this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); - this.comparator = new IntermediateSortTempRowComparator( - tableFieldStat.getIsSortColNoDictFlags()); + this.executorService = Executors .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName)); } @@ -167,12 +178,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold if (prefetch) { fillDataForPrefetch(); } else { - try { - this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream); - this.numberOfObjectRead++; - } catch (IOException e) { - throw new CarbonSortKeyAndGroupByException("Problem while reading rows", e); - } + this.returnRow = getRowFromStream(); } } @@ -206,28 +212,86 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold } /** - * Read a batch of row from stream - * + * Reads row from file * @return Object[] - * @throws IOException if error occurs while reading from stream + * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) throws IOException { - IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected]; - for (int i = 0; i < expected; i++) { - IntermediateSortTempRow holder - = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream); - holders[i] = holder; + private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException { + // create new row of size 3 (1 for dims , 1 for high card , 1 for measures) + + Object[] holder = new Object[3]; + int index = 0; + int nonDicIndex = 0; + int[] dim = new int[dimCnt - noDictDimCnt]; + byte[][] nonDicArray = new byte[noDictDimCnt + complexCnt][]; + Object[] measures = new Object[measureCnt]; + try { + // read dimension values + for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) { + if (isNoDictionaryDimensionColumn[i]) { + short len = stream.readShort(); + byte[] array = new byte[len]; + stream.readFully(array); + nonDicArray[nonDicIndex++] = array; + } else { + dim[index++] = stream.readInt(); + } + } + + for (int i = 0; i < complexCnt; i++) { + short len = stream.readShort(); + byte[] array = new byte[len]; + stream.readFully(array); + nonDicArray[nonDicIndex++] = array; + } + + index = 0; + // read measure values + for (int i = 0; i < measureCnt; i++) { + if (stream.readByte() == 1) { + DataType dataType = measureDataTypes[i]; + if (dataType == DataTypes.BOOLEAN) { + measures[index++] = stream.readBoolean(); + } else if (dataType == DataTypes.SHORT) { + measures[index++] = stream.readShort(); + } else if (dataType == DataTypes.INT) { + measures[index++] = stream.readInt(); + } else if (dataType == DataTypes.LONG) { + measures[index++] = stream.readLong(); + } else if (dataType == DataTypes.DOUBLE) { + measures[index++] = stream.readDouble(); + } else if (DataTypes.isDecimal(dataType)) { + int len = stream.readInt(); + byte[] buff = new byte[len]; + stream.readFully(buff); + measures[index++] = DataTypeUtil.byteToBigDecimal(buff); + } else { + throw new IllegalArgumentException("unsupported data type:" + dataType); + } + } else { + measures[index++] = null; + } + } + + NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures); + + // increment number if record read + this.numberOfObjectRead++; + } catch (IOException e) { + LOGGER.error("Problme while reading the madkey fom sort temp file"); + throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e); } - this.numberOfObjectRead += expected; - return holders; + + //return out row + return holder; } /** - * below method will be used to get the sort temp row + * below method will be used to get the row * * @return row */ - public IntermediateSortTempRow getRow() { + public Object[] getRow() { return this.returnRow; } @@ -266,7 +330,31 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold } @Override public int compareTo(SortTempFileChunkHolder other) { - return comparator.compare(returnRow, other.getRow()); + int diff = 0; + int index = 0; + int noDictionaryIndex = 0; + int[] leftMdkArray = (int[]) returnRow[0]; + int[] rightMdkArray = (int[]) other.returnRow[0]; + byte[][] leftNonDictArray = (byte[][]) returnRow[1]; + byte[][] rightNonDictArray = (byte[][]) other.returnRow[1]; + for (boolean isNoDictionary : isNoDictionarySortColumn) { + if (isNoDictionary) { + diff = UnsafeComparer.INSTANCE + .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]); + if (diff != 0) { + return diff; + } + noDictionaryIndex++; + } else { + diff = leftMdkArray[index] - rightMdkArray[index]; + if (diff != 0) { + return diff; + } + index++; + } + + } + return diff; } @Override public boolean equals(Object obj) { @@ -284,7 +372,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold @Override public int hashCode() { int hash = 0; - hash += tableFieldStat.hashCode(); + hash += 31 * measureCnt; + hash += 31 * dimCnt; + hash += 31 * complexCnt; hash += tempFile.hashCode(); return hash; } @@ -324,12 +414,16 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold /** * This method will read the records from sort temp file and keep it in a buffer * - * @param numberOfRecords number of records to be read - * @return batch of intermediate sort temp row - * @throws IOException if error occurs while reading reading records + * @param numberOfRecords + * @return + * @throws CarbonSortKeyAndGroupByException */ - private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords) - throws IOException { - return readBatchedRowFromStream(numberOfRecords); + private Object[][] prefetchRecordsFromFile(int numberOfRecords) + throws CarbonSortKeyAndGroupByException { + Object[][] records = new Object[numberOfRecords][]; + for (int i = 0; i < numberOfRecords; i++) { + records[i] = getRowFromStream(); + } + return records; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/78aa2cc3/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java deleted file mode 100644 index 0d1303a..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.sort.sortdata; - -import java.io.Serializable; -import java.util.Objects; - -import org.apache.carbondata.core.metadata.datatype.DataType; - -/** - * This class is used to hold field information for a table during data loading. These information - * will be used to convert/construct/destruct row in sort process step. Because complex field is - * processed the same as no-dict-no-sort-simple-dimension, so we treat them as the same and use - * `no-dict-no-sort-dim` related variable to represent them in this class. - */ -public class TableFieldStat implements Serializable { - private static final long serialVersionUID = 201712070950L; - private int dictSortDimCnt = 0; - private int dictNoSortDimCnt = 0; - private int noDictSortDimCnt = 0; - private int noDictNoSortDimCnt = 0; - // whether sort column is of dictionary type or not - private boolean[] isSortColNoDictFlags; - private int measureCnt; - private DataType[] measureDataType; - - // indices for dict & sort dimension columns - private int[] dictSortDimIdx; - // indices for dict & no-sort dimension columns - private int[] dictNoSortDimIdx; - // indices for no-dict & sort dimension columns - private int[] noDictSortDimIdx; - // indices for no-dict & no-sort dimension columns, including complex columns - private int[] noDictNoSortDimIdx; - // indices for measure columns - private int[] measureIdx; - - public TableFieldStat(SortParameters sortParameters) { - int noDictDimCnt = sortParameters.getNoDictionaryCount(); - int complexDimCnt = sortParameters.getComplexDimColCount(); - int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt; - this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn(); - int sortColCnt = isSortColNoDictFlags.length; - for (boolean flag : isSortColNoDictFlags) { - if (flag) { - noDictSortDimCnt++; - } else { - dictSortDimCnt++; - } - } - this.measureCnt = sortParameters.getMeasureColCount(); - this.measureDataType = sortParameters.getMeasureDataType(); - - // be careful that the default value is 0 - this.dictSortDimIdx = new int[dictSortDimCnt]; - this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt]; - this.noDictSortDimIdx = new int[noDictSortDimCnt]; - this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt]; - this.measureIdx = new int[measureCnt]; - - int tmpNoDictSortCnt = 0; - int tmpNoDictNoSortCnt = 0; - int tmpDictSortCnt = 0; - int tmpDictNoSortCnt = 0; - boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn(); - - for (int i = 0; i < isDimNoDictFlags.length; i++) { - if (isDimNoDictFlags[i]) { - if (i < sortColCnt && isSortColNoDictFlags[i]) { - noDictSortDimIdx[tmpNoDictSortCnt++] = i; - } else { - noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i; - } - } else { - if (i < sortColCnt && !isSortColNoDictFlags[i]) { - dictSortDimIdx[tmpDictSortCnt++] = i; - } else { - dictNoSortDimIdx[tmpDictNoSortCnt++] = i; - } - } - } - dictNoSortDimCnt = tmpDictNoSortCnt; - - int base = isDimNoDictFlags.length; - // adding complex dimension columns - for (int i = 0; i < complexDimCnt; i++) { - noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = base + i; - } - noDictNoSortDimCnt = tmpNoDictNoSortCnt; - - base += complexDimCnt; - // indices for measure columns - for (int i = 0; i < measureCnt; i++) { - measureIdx[i] = base + i; - } - } - - public int getDictSortDimCnt() { - return dictSortDimCnt; - } - - public int getDictNoSortDimCnt() { - return dictNoSortDimCnt; - } - - public int getNoDictSortDimCnt() { - return noDictSortDimCnt; - } - - public int getNoDictNoSortDimCnt() { - return noDictNoSortDimCnt; - } - - public boolean[] getIsSortColNoDictFlags() { - return isSortColNoDictFlags; - } - - public int getMeasureCnt() { - return measureCnt; - } - - public DataType[] getMeasureDataType() { - return measureDataType; - } - - public int[] getDictSortDimIdx() { - return dictSortDimIdx; - } - - public int[] getDictNoSortDimIdx() { - return dictNoSortDimIdx; - } - - public int[] getNoDictSortDimIdx() { - return noDictSortDimIdx; - } - - public int[] getNoDictNoSortDimIdx() { - return noDictNoSortDimIdx; - } - - public int[] getMeasureIdx() { - return measureIdx; - } - - @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof TableFieldStat)) return false; - TableFieldStat that = (TableFieldStat) o; - return dictSortDimCnt == that.dictSortDimCnt - && dictNoSortDimCnt == that.dictNoSortDimCnt - && noDictSortDimCnt == that.noDictSortDimCnt - && noDictNoSortDimCnt == that.noDictNoSortDimCnt - && measureCnt == that.measureCnt; - } - - @Override public int hashCode() { - return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt, - noDictNoSortDimCnt, measureCnt); - } -} \ No newline at end of file