http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java index 11b3d43..527452a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -31,15 +31,14 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; -import org.apache.carbondata.processing.sort.sortdata.NewRowComparator; +import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator; import org.apache.carbondata.processing.sort.sortdata.SortParameters; +import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { @@ -63,21 +62,15 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { * entry count */ private int entryCount; - /** * return row */ - private Object[] returnRow; - private int dimCnt; - private int complexCnt; - private int measureCnt; - private boolean[] isNoDictionaryDimensionColumn; - private DataType[] measureDataTypes; + private IntermediateSortTempRow returnRow; private int readBufferSize; private String compressorName; - private Object[][] currentBuffer; + private IntermediateSortTempRow[] currentBuffer; - private Object[][] backupBuffer; + private IntermediateSortTempRow[] backupBuffer; private boolean isBackupFilled; @@ -100,27 +93,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { private int numberOfObjectRead; - private int nullSetWordsLength; - - private Comparator<Object[]> comparator; - + private TableFieldStat tableFieldStat; + private SortStepRowHandler sortStepRowHandler; + private Comparator<IntermediateSortTempRow> comparator; /** * Constructor to initialize */ public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) { // set temp file this.tempFile = tempFile; - this.dimCnt = parameters.getDimColCount(); - this.complexCnt = parameters.getComplexDimColCount(); - this.measureCnt = parameters.getMeasureColCount(); - this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); - this.measureDataTypes = parameters.getMeasureDataType(); this.readBufferSize = parameters.getBufferSize(); this.compressorName = parameters.getSortTempCompressorName(); - + this.tableFieldStat = new TableFieldStat(parameters); + this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); this.executorService = Executors.newFixedThreadPool(1); - this.nullSetWordsLength = ((parameters.getMeasureColCount() - 1) >> 6) + 1; - comparator = new NewRowComparator(parameters.getNoDictionarySortColumn()); + comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn()); initialize(); } @@ -169,11 +156,17 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { * * @throws CarbonSortKeyAndGroupByException problem while reading */ + @Override public void readRow() throws CarbonSortKeyAndGroupByException { if (prefetch) { fillDataForPrefetch(); } else { - this.returnRow = getRowFromStream(); + try { + this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream); + this.numberOfObjectRead++; + } catch (IOException e) { + throw new CarbonSortKeyAndGroupByException("Problems while reading row", e); + } } } @@ -207,63 +200,22 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { } /** - * @return - * @throws CarbonSortKeyAndGroupByException + * get a batch of row, this interface is used in reading compressed sort temp files + * + * @param expected expected number in a batch + * @return a batch of row + * @throws IOException if error occurs while reading from stream */ - private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException { - Object[] row = new Object[dimCnt + measureCnt]; - try { - int dimCount = 0; - for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { - if (isNoDictionaryDimensionColumn[dimCount]) { - short aShort = stream.readShort(); - byte[] col = new byte[aShort]; - stream.readFully(col); - row[dimCount] = col; - } else { - int anInt = stream.readInt(); - row[dimCount] = anInt; - } - } - - // write complex dimensions here. - for (; dimCount < dimCnt; dimCount++) { - short aShort = stream.readShort(); - byte[] col = new byte[aShort]; - stream.readFully(col); - row[dimCount] = col; - } - - long[] words = new long[nullSetWordsLength]; - for (int i = 0; i < words.length; i++) { - words[i] = stream.readLong(); - } - - for (int mesCount = 0; mesCount < measureCnt; mesCount++) { - if (UnsafeCarbonRowPage.isSet(words, mesCount)) { - DataType dataType = measureDataTypes[mesCount]; - if (dataType == DataTypes.SHORT) { - row[dimCount + mesCount] = stream.readShort(); - } else if (dataType == DataTypes.INT) { - row[dimCount + mesCount] = stream.readInt(); - } else if (dataType == DataTypes.LONG) { - row[dimCount + mesCount] = stream.readLong(); - } else if (dataType == DataTypes.DOUBLE) { - row[dimCount + mesCount] = stream.readDouble(); - } else if (DataTypes.isDecimal(dataType)) { - short aShort = stream.readShort(); - byte[] bigDecimalInBytes = new byte[aShort]; - stream.readFully(bigDecimalInBytes); - row[dimCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); - } else { - throw new IllegalArgumentException("unsupported data type:" + dataType); - } - } - } - return row; - } catch (IOException e) { - throw new CarbonSortKeyAndGroupByException(e); + private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) + throws IOException { + IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected]; + for (int i = 0; i < expected; i++) { + IntermediateSortTempRow holder + = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream); + holders[i] = holder; } + this.numberOfObjectRead += expected; + return holders; } /** @@ -271,7 +223,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { * * @return row */ - public Object[] getRow() { + public IntermediateSortTempRow getRow() { return this.returnRow; } @@ -326,9 +278,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { @Override public int hashCode() { int hash = 0; - hash += 31 * measureCnt; - hash += 31 * dimCnt; - hash += 31 * complexCnt; + hash += tableFieldStat.hashCode(); hash += tempFile.hashCode(); return hash; } @@ -368,16 +318,12 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { /** * This method will read the records from sort temp file and keep it in a buffer * - * @param numberOfRecords - * @return - * @throws CarbonSortKeyAndGroupByException + * @param numberOfRecords number of records to be read + * @return batch of intermediate sort temp row + * @throws IOException if error occurs reading records from file */ - private Object[][] prefetchRecordsFromFile(int numberOfRecords) - throws CarbonSortKeyAndGroupByException { - Object[][] records = new Object[numberOfRecords][]; - for (int i = 0; i < numberOfRecords; i++) { - records[i] = getRowFromStream(); - } - return records; + private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords) + throws IOException { + return readBatchedRowFromStream(numberOfRecords); } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java index 4bbf61b..22673ff 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java @@ -21,25 +21,21 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; import java.util.AbstractQueue; -import java.util.Arrays; import java.util.PriorityQueue; import java.util.concurrent.Callable; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder; import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sort.sortdata.SortParameters; +import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; public class UnsafeIntermediateFileMerger implements Callable<Void> { /** @@ -69,22 +65,13 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { private int totalNumberOfRecords; private SortParameters mergerParameters; - + private TableFieldStat tableFieldStat; private File[] intermediateFiles; - private File outPutFile; - private int dimCnt; - private int complexCnt; - private int measureCnt; - private boolean[] isNoDictionaryDimensionColumn; - private DataType[] measureDataTypes; private int writeBufferSize; private String compressorName; - - private long[] nullSetWords; - - private ByteBuffer rowData; + private SortStepRowHandler sortStepRowHandler; private Throwable throwable; @@ -97,16 +84,10 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { this.fileCounter = intermediateFiles.length; this.intermediateFiles = intermediateFiles; this.outPutFile = outPutFile; - this.dimCnt = mergerParameters.getDimColCount(); - this.complexCnt = mergerParameters.getComplexDimColCount(); - this.measureCnt = mergerParameters.getMeasureColCount(); - this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn(); - this.measureDataTypes = mergerParameters.getMeasureDataType(); this.writeBufferSize = mergerParameters.getBufferSize(); this.compressorName = mergerParameters.getSortTempCompressorName(); - this.nullSetWords = new long[((measureCnt - 1) >> 6) + 1]; - // Take size of 2 MB for each row. I think it is high enough to use - rowData = ByteBuffer.allocate(2 * 1024 * 1024); + this.tableFieldStat = new TableFieldStat(mergerParameters); + this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); } @Override public Void call() throws Exception { @@ -165,13 +146,14 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { } /** - * This method will be used to get the sorted record from file + * This method will be used to get sorted sort temp row from the sort temp files * * @return sorted record sorted record * @throws CarbonSortKeyAndGroupByException */ - private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException { - Object[] row = null; + private IntermediateSortTempRow getSortedRecordFromFile() + throws CarbonSortKeyAndGroupByException { + IntermediateSortTempRow row = null; // poll the top object from heap // heap maintains binary tree which is based on heap condition that will @@ -235,7 +217,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { this.recordHolderHeap.add(sortTempFileChunkHolder); } - LOGGER.info("Heap Size" + this.recordHolderHeap.size()); + LOGGER.info("Heap Size: " + this.recordHolderHeap.size()); } /** @@ -250,12 +232,12 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { } /** - * This method will be used to get the sorted row + * This method will be used to get the sorted sort temp row * * @return sorted row * @throws CarbonSortKeyAndGroupByException */ - private Object[] next() throws CarbonSortKeyAndGroupByException { + private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException { return getSortedRecordFromFile(); } @@ -272,82 +254,16 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { /** * Below method will be used to write data to file * - * @throws CarbonSortKeyAndGroupByException problem while writing + * @throws IOException problem while writing */ - private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException { - int dimCount = 0; - int size = 0; - for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { - if (isNoDictionaryDimensionColumn[dimCount]) { - byte[] col = (byte[]) row[dimCount]; - rowData.putShort((short) col.length); - size += 2; - rowData.put(col); - size += col.length; - } else { - rowData.putInt((int) row[dimCount]); - size += 4; - } - } - - // write complex dimensions here. - int dimensionSize = dimCnt + complexCnt; - for (; dimCount < dimensionSize; dimCount++) { - byte[] col = (byte[]) row[dimCount]; - rowData.putShort((short)col.length); - size += 2; - rowData.put(col); - size += col.length; - } - Arrays.fill(nullSetWords, 0); - int nullSetSize = nullSetWords.length * 8; - int nullLoc = size; - size += nullSetSize; - for (int mesCount = 0; mesCount < measureCnt; mesCount++) { - Object value = row[mesCount + dimensionSize]; - if (null != value) { - DataType dataType = measureDataTypes[mesCount]; - if (dataType == DataTypes.SHORT) { - rowData.putShort(size, (Short) value); - size += 2; - } else if (dataType == DataTypes.INT) { - rowData.putInt(size, (Integer) value); - size += 4; - } else if (dataType == DataTypes.LONG) { - rowData.putLong(size, (Long) value); - size += 8; - } else if (dataType == DataTypes.DOUBLE) { - rowData.putDouble(size, (Double) value); - size += 8; - } else if (DataTypes.isDecimal(dataType)) { - byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(((BigDecimal) value)); - rowData.putShort(size, (short) bigDecimalInBytes.length); - size += 2; - for (int i = 0; i < bigDecimalInBytes.length; i++) { - rowData.put(size++, bigDecimalInBytes[i]); - } - } - UnsafeCarbonRowPage.set(nullSetWords, mesCount); - } else { - UnsafeCarbonRowPage.unset(nullSetWords, mesCount); - } - } - for (int i = 0; i < nullSetWords.length; i++) { - rowData.putLong(nullLoc, nullSetWords[i]); - nullLoc += 8; - } - byte[] rowBytes = new byte[size]; - rowData.position(0); - rowData.get(rowBytes); - stream.write(rowBytes); - rowData.clear(); + private void writeDataToFile(IntermediateSortTempRow row) throws IOException { + sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream); } private void finish() throws CarbonSortKeyAndGroupByException { clear(); try { CarbonUtil.deleteFiles(intermediateFiles); - rowData.clear(); } catch (IOException e) { throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index ce118d9..64f3c25 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -29,7 +29,8 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.processing.loading.sort.SortStepRowUtil; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder; import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeFinalMergePageHolder; @@ -55,7 +56,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal; private SortParameters parameters; - private SortStepRowUtil sortStepRowUtil; + private SortStepRowHandler sortStepRowHandler; /** * tempFileLocation */ @@ -68,7 +69,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters, String[] tempFileLocation) { this.parameters = parameters; - this.sortStepRowUtil = new SortStepRowUtil(parameters); + this.sortStepRowHandler = new SortStepRowHandler(parameters); this.tempFileLocation = tempFileLocation; this.tableName = parameters.getTableName(); } @@ -108,9 +109,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec LOGGER.info("Started adding first record from each page"); for (final UnsafeCarbonRowPage rowPage : rowPages) { - SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage, - parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters - .getMeasureColCount(), parameters.getNumberOfSortColumns()); + SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage); // initialize sortTempFileChunkHolder.readRow(); @@ -121,9 +120,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec for (final UnsafeInMemoryIntermediateDataMerger merger : merges) { SortTempChunkHolder sortTempFileChunkHolder = - new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(), - parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters - .getMeasureColCount()); + new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn()); // initialize sortTempFileChunkHolder.readRow(); @@ -142,7 +139,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec recordHolderHeapLocal.add(sortTempFileChunkHolder); } - LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size()); + LOGGER.info("Heap Size: " + this.recordHolderHeapLocal.size()); } catch (Exception e) { LOGGER.error(e); throw new CarbonDataWriterException(e); @@ -180,12 +177,14 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec } /** - * This method will be used to get the sorted row + * This method will be used to get the sorted row in 3-parted format. + * The row will feed the following writer process step. * * @return sorted row */ public Object[] next() { - return sortStepRowUtil.convertRow(getSortedRecordFromFile()); + IntermediateSortTempRow sortTempRow = getSortedRecordFromFile(); + return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow); } /** @@ -193,8 +192,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec * * @return sorted record sorted record */ - private Object[] getSortedRecordFromFile() throws CarbonDataWriterException { - Object[] row = null; + private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException { + IntermediateSortTempRow row = null; // poll the top object from heap // heap maintains binary tree which is based on heap condition that will http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 751903a..ea11e22 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -389,7 +389,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length); } sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping); - String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation, CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); finalMerger = http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java index 04efa1f..c06819c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java @@ -21,7 +21,6 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.math.BigDecimal; import java.util.AbstractQueue; import java.util.PriorityQueue; import java.util.concurrent.Callable; @@ -29,11 +28,9 @@ import java.util.concurrent.Callable; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.core.util.NonDictionaryUtil; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; public class IntermediateFileMerger implements Callable<Void> { @@ -68,17 +65,12 @@ public class IntermediateFileMerger implements Callable<Void> { private File[] intermediateFiles; private File outPutFile; - private int dimCnt; - private int noDictDimCnt; - private int complexCnt; - private int measureCnt; - private boolean[] isNoDictionaryDimensionColumn; - private DataType[] measureDataTypes; private int writeBufferSize; private String compressorName; private Throwable throwable; - + private TableFieldStat tableFieldStat; + private SortStepRowHandler sortStepRowHandler; /** * IntermediateFileMerger Constructor */ @@ -88,14 +80,10 @@ public class IntermediateFileMerger implements Callable<Void> { this.fileCounter = intermediateFiles.length; this.intermediateFiles = intermediateFiles; this.outPutFile = outPutFile; - this.dimCnt = mergerParameters.getDimColCount(); - this.noDictDimCnt = mergerParameters.getNoDictionaryCount(); - this.complexCnt = mergerParameters.getComplexDimColCount(); - this.measureCnt = mergerParameters.getMeasureColCount(); - this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn(); - this.measureDataTypes = mergerParameters.getMeasureDataType(); this.writeBufferSize = mergerParameters.getBufferSize(); this.compressorName = mergerParameters.getSortTempCompressorName(); + this.tableFieldStat = new TableFieldStat(mergerParameters); + this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); } @Override public Void call() throws Exception { @@ -154,13 +142,14 @@ public class IntermediateFileMerger implements Callable<Void> { } /** - * This method will be used to get the sorted record from file + * This method will be used to get the sorted sort temp row from sort temp file * * @return sorted record sorted record * @throws CarbonSortKeyAndGroupByException */ - private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException { - Object[] row = null; + private IntermediateSortTempRow getSortedRecordFromFile() + throws CarbonSortKeyAndGroupByException { + IntermediateSortTempRow row = null; // poll the top object from heap // heap maintains binary tree which is based on heap condition that will @@ -227,7 +216,7 @@ public class IntermediateFileMerger implements Callable<Void> { this.recordHolderHeap.add(sortTempFileChunkHolder); } - LOGGER.info("Heap Size" + this.recordHolderHeap.size()); + LOGGER.info("Heap Size: " + this.recordHolderHeap.size()); } /** @@ -242,12 +231,12 @@ public class IntermediateFileMerger implements Callable<Void> { } /** - * This method will be used to get the sorted row + * This method will be used to get the sorted sort temp row * * @return sorted row * @throws CarbonSortKeyAndGroupByException */ - private Object[] next() throws CarbonSortKeyAndGroupByException { + private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException { return getSortedRecordFromFile(); } @@ -264,62 +253,10 @@ public class IntermediateFileMerger implements Callable<Void> { /** * Below method will be used to write data to file * - * @throws CarbonSortKeyAndGroupByException problem while writing + * @throws IOException problem while writing */ - private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException { - try { - int[] mdkArray = (int[]) row[0]; - byte[][] nonDictArray = (byte[][]) row[1]; - int mdkIndex = 0; - int nonDictKeyIndex = 0; - // write dictionary and non dictionary dimensions here. - for (boolean nodictinary : isNoDictionaryDimensionColumn) { - if (nodictinary) { - byte[] col = nonDictArray[nonDictKeyIndex++]; - stream.writeShort(col.length); - stream.write(col); - } else { - stream.writeInt(mdkArray[mdkIndex++]); - } - } - // write complex - for (; nonDictKeyIndex < noDictDimCnt + complexCnt; nonDictKeyIndex++) { - byte[] col = nonDictArray[nonDictKeyIndex++]; - stream.writeShort(col.length); - stream.write(col); - } - // write measure - int fieldIndex = 0; - for (int counter = 0; counter < measureCnt; counter++) { - if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) { - stream.write((byte) 1); - DataType dataType = measureDataTypes[counter]; - if (dataType == DataTypes.BOOLEAN) { - stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row)); - } else if (dataType == DataTypes.SHORT) { - stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row)); - } else if (dataType == DataTypes.INT) { - stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row)); - } else if (dataType == DataTypes.LONG) { - stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row)); - } else if (dataType == DataTypes.DOUBLE) { - stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row)); - } else if (DataTypes.isDecimal(dataType)) { - byte[] bigDecimalInBytes = DataTypeUtil - .bigDecimalToByte((BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row)); - stream.writeInt(bigDecimalInBytes.length); - stream.write(bigDecimalInBytes); - } else { - throw new IllegalArgumentException("unsupported data type:" + dataType); - } - } else { - stream.write((byte) 0); - } - fieldIndex++; - } - } catch (IOException e) { - throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e); - } + private void writeDataToFile(IntermediateSortTempRow row) throws IOException { + sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream); } private void finish() throws CarbonSortKeyAndGroupByException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java new file mode 100644 index 0000000..9b6d1e8 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.sort.sortdata; + +import java.util.Comparator; + +import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; + +/** + * This class is used as comparator for comparing intermediate sort temp row + */ +public class IntermediateSortTempRowComparator implements Comparator<IntermediateSortTempRow> { + /** + * isSortColumnNoDictionary whether the sort column is not dictionary or not + */ + private boolean[] isSortColumnNoDictionary; + + /** + * @param isSortColumnNoDictionary isSortColumnNoDictionary + */ + public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) { + this.isSortColumnNoDictionary = isSortColumnNoDictionary; + } + + /** + * Below method will be used to compare two sort temp row + */ + public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow rowB) { + int diff = 0; + int dictIndex = 0; + int nonDictIndex = 0; + + for (boolean isNoDictionary : isSortColumnNoDictionary) { + + if (isNoDictionary) { + byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex]; + byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex]; + nonDictIndex++; + + int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); + if (difference != 0) { + return difference; + } + } else { + int dimFieldA = rowA.getDictSortDims()[dictIndex]; + int dimFieldB = rowB.getDictSortDims()[dictIndex]; + dictIndex++; + + diff = dimFieldA - dimFieldB; + if (diff != 0) { + return diff; + } + } + } + return diff; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java index d2579d2..3f94533 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java @@ -40,14 +40,11 @@ public class NewRowComparator implements Comparator<Object[]> { */ public int compare(Object[] rowA, Object[] rowB) { int diff = 0; - int index = 0; for (boolean isNoDictionary : noDictionarySortColumnMaping) { - if (isNoDictionary) { byte[] byteArr1 = (byte[]) rowA[index]; - byte[] byteArr2 = (byte[]) rowB[index]; int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2); @@ -57,6 +54,7 @@ public class NewRowComparator implements Comparator<Object[]> { } else { int dimFieldA = (int) rowA[index]; int dimFieldB = (int) rowB[index]; + diff = dimFieldA - dimFieldB; if (diff != 0) { return diff; @@ -65,7 +63,6 @@ public class NewRowComparator implements Comparator<Object[]> { index++; } - return diff; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java index e01b587..7538c92 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java @@ -29,7 +29,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> { private int numberOfSortColumns; /** - * RowComparatorForNormalDims Constructor + * NewRowComparatorForNormalDims Constructor * * @param numberOfSortColumns */ @@ -46,7 +46,6 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> { int diff = 0; for (int i = 0; i < numberOfSortColumns; i++) { - int dimFieldA = (int)rowA[i]; int dimFieldB = (int)rowB[i]; diff = dimFieldA - dimFieldB; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java deleted file mode 100644 index 0ae0b93..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.sort.sortdata; - -import java.nio.ByteBuffer; -import java.util.Comparator; - -import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; -import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; -import org.apache.carbondata.core.util.NonDictionaryUtil; - -public class RowComparator implements Comparator<Object[]> { - /** - * noDictionaryCount represent number of no dictionary cols - */ - private int noDictionaryCount; - - /** - * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions. - */ - private boolean[] noDictionarySortColumnMaping; - - /** - * @param noDictionarySortColumnMaping - * @param noDictionaryCount - */ - public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) { - this.noDictionaryCount = noDictionaryCount; - this.noDictionarySortColumnMaping = noDictionarySortColumnMaping; - } - - /** - * Below method will be used to compare two mdkey - */ - public int compare(Object[] rowA, Object[] rowB) { - int diff = 0; - - int normalIndex = 0; - int noDictionaryindex = 0; - - for (boolean isNoDictionary : noDictionarySortColumnMaping) { - - if (isNoDictionary) { - byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX]; - - ByteBuffer buff1 = ByteBuffer.wrap(byteArr1); - - // extract a high card dims from complete byte[]. - NonDictionaryUtil - .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1); - - byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX]; - - ByteBuffer buff2 = ByteBuffer.wrap(byteArr2); - - // extract a high card dims from complete byte[]. - NonDictionaryUtil - .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2); - - int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2); - if (difference != 0) { - return difference; - } - noDictionaryindex++; - } else { - int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA); - int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB); - diff = dimFieldA - dimFieldB; - if (diff != 0) { - return diff; - } - normalIndex++; - } - - } - - return diff; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java deleted file mode 100644 index 0883ae1..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.processing.sort.sortdata; - -import java.util.Comparator; - -import org.apache.carbondata.core.util.NonDictionaryUtil; - -/** - * This class is used as comparator for comparing dims which are non high cardinality dims. - * Here the dims will be in form of int[] (surrogates) so directly comparing the integers. - */ -public class RowComparatorForNormalDims implements Comparator<Object[]> { - /** - * dimension count - */ - private int numberOfSortColumns; - - /** - * RowComparatorForNormalDims Constructor - * - * @param numberOfSortColumns - */ - public RowComparatorForNormalDims(int numberOfSortColumns) { - this.numberOfSortColumns = numberOfSortColumns; - } - - /** - * Below method will be used to compare two surrogate keys - * - * @see Comparator#compare(Object, Object) - */ - public int compare(Object[] rowA, Object[] rowB) { - int diff = 0; - - for (int i = 0; i < numberOfSortColumns; i++) { - - int dimFieldA = NonDictionaryUtil.getDimension(i, rowA); - int dimFieldB = NonDictionaryUtil.getDimension(i, rowB); - - diff = dimFieldA - dimFieldB; - if (diff != 0) { - return diff; - } - } - return diff; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java index 88695b9..a4ac0ea 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java @@ -37,6 +37,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -71,12 +73,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { * tableName */ private String tableName; - + private SortParameters sortParameters; + private SortStepRowHandler sortStepRowHandler; /** * tempFileLocation */ private String[] tempFileLocation; - private SortParameters sortParameters; private int maxThreadForSorting; @@ -89,6 +91,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { this.tempFileLocation = tempFileLocation; this.tableName = tableName; this.sortParameters = sortParameters; + this.sortStepRowHandler = new SortStepRowHandler(sortParameters); try { maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD, @@ -107,8 +110,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { */ public void startFinalMerge() throws CarbonDataWriterException { List<File> filesToMerge = getFilesToMergeSort(); - if (filesToMerge.size() == 0) - { + if (filesToMerge.size() == 0) { LOGGER.info("No file to merge in final merge stage"); return; } @@ -125,11 +127,9 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { // get all the merged files List<File> files = new ArrayList<File>(tempFileLocation.length); - for (String tempLoc : tempFileLocation) - { + for (String tempLoc : tempFileLocation) { File[] subFiles = new File(tempLoc).listFiles(fileFilter); - if (null != subFiles && subFiles.length > 0) - { + if (null != subFiles && subFiles.length > 0) { files.addAll(Arrays.asList(subFiles)); } } @@ -226,13 +226,14 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { } /** - * This method will be used to get the sorted row + * This method will be used to get the sorted sort temp row from the sort temp files * * @return sorted row * @throws CarbonSortKeyAndGroupByException */ public Object[] next() { - return getSortedRecordFromFile(); + IntermediateSortTempRow sortTempRow = getSortedRecordFromFile(); + return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow); } /** @@ -241,8 +242,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { * @return sorted record sorted record * @throws CarbonSortKeyAndGroupByException */ - private Object[] getSortedRecordFromFile() throws CarbonDataWriterException { - Object[] row = null; + private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException { + IntermediateSortTempRow row = null; // poll the top object from heap // heap maintains binary tree which is based on heap condition that will http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java index 57a19bd..c7efbd9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java @@ -20,7 +20,7 @@ package org.apache.carbondata.processing.sort.sortdata; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; -import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -32,12 +32,10 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -69,7 +67,8 @@ public class SortDataRows { private Semaphore semaphore; private SortParameters parameters; - + private SortStepRowHandler sortStepRowHandler; + private ThreadLocal<ByteBuffer> rowBuffer; private int sortBufferSize; private SortIntermediateFileMerger intermediateFileMerger; @@ -79,7 +78,7 @@ public class SortDataRows { public SortDataRows(SortParameters parameters, SortIntermediateFileMerger intermediateFileMerger) { this.parameters = parameters; - + this.sortStepRowHandler = new SortStepRowHandler(parameters); this.intermediateFileMerger = intermediateFileMerger; int batchSize = CarbonProperties.getInstance().getBatchSize(); @@ -87,6 +86,12 @@ public class SortDataRows { this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize); // observer of writing file in thread this.threadStatusObserver = new ThreadStatusObserver(); + this.rowBuffer = new ThreadLocal<ByteBuffer>() { + @Override protected ByteBuffer initialValue() { + byte[] backedArray = new byte[2 * 1024 * 1024]; + return ByteBuffer.wrap(backedArray); + } + }; } /** @@ -130,8 +135,7 @@ public class SortDataRows { semaphore.acquire(); dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); } catch (InterruptedException e) { - LOGGER.error(e, - "exception occurred while trying to acquire a semaphore lock: "); + LOGGER.error(e, "exception occurred while trying to acquire a semaphore lock: "); throw new CarbonSortKeyAndGroupByException(e); } // create the new holder Array @@ -158,7 +162,7 @@ public class SortDataRows { } intermediateFileMerger.startMergingIfPossible(); Object[][] recordHolderListLocal = recordHolderList; - sizeLeft = sortBufferSize - entryCount ; + sizeLeft = sortBufferSize - entryCount; if (sizeLeft > 0) { System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); } @@ -212,7 +216,6 @@ public class SortDataRows { locationChosen + File.separator + parameters.getTableName() + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataToFile(recordHolderList, this.entryCount, file); - } startFileBasedMerge(); @@ -220,7 +223,7 @@ public class SortDataRows { } /** - * Below method will be used to write data to file + * Below method will be used to write data to sort temp file * * @throws CarbonSortKeyAndGroupByException problem while writing */ @@ -233,60 +236,9 @@ public class SortDataRows { parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName()); // write number of entries to the file stream.writeInt(entryCountLocal); - int complexDimColCount = parameters.getComplexDimColCount(); - int dimColCount = parameters.getDimColCount() + complexDimColCount; - DataType[] type = parameters.getMeasureDataType(); - boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn(); - Object[] row = null; for (int i = 0; i < entryCountLocal; i++) { - // get row from record holder list - row = recordHolderList[i]; - int dimCount = 0; - // write dictionary and non dictionary dimensions here. - for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) { - if (noDictionaryDimnesionMapping[dimCount]) { - byte[] col = (byte[]) row[dimCount]; - stream.writeShort(col.length); - stream.write(col); - } else { - stream.writeInt((int)row[dimCount]); - } - } - // write complex dimensions here. - for (; dimCount < dimColCount; dimCount++) { - byte[] value = (byte[])row[dimCount]; - stream.writeShort(value.length); - stream.write(value); - } - // as measures are stored in separate array. - for (int mesCount = 0; - mesCount < parameters.getMeasureColCount(); mesCount++) { - Object value = row[mesCount + dimColCount]; - if (null != value) { - stream.write((byte) 1); - DataType dataType = type[mesCount]; - if (dataType == DataTypes.BOOLEAN) { - stream.writeBoolean((boolean) value); - } else if (dataType == DataTypes.SHORT) { - stream.writeShort((Short) value); - } else if (dataType == DataTypes.INT) { - stream.writeInt((Integer) value); - } else if (dataType == DataTypes.LONG) { - stream.writeLong((Long) value); - } else if (dataType == DataTypes.DOUBLE) { - stream.writeDouble((Double) value); - } else if (DataTypes.isDecimal(dataType)) { - BigDecimal val = (BigDecimal) value; - byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val); - stream.writeInt(bigDecimalInBytes.length); - stream.write(bigDecimalInBytes); - } else { - throw new IllegalArgumentException("unsupported data type:" + type[mesCount]); - } - } else { - stream.write((byte) 0); - } - } + sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToOutputStream( + recordHolderList[i], stream, rowBuffer.get()); } } catch (IOException e) { throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e); @@ -301,7 +253,7 @@ public class SortDataRows { * * @throws CarbonSortKeyAndGroupByException */ - public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException { + private void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException { CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation()); } @@ -380,7 +332,8 @@ public class SortDataRows { // intermediate merging of sort temp files will be triggered intermediateFileMerger.addFileToMerge(sortTempFile); LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + ( - System.currentTimeMillis() - startTime)); + System.currentTimeMillis() - startTime) + ", sort temp file size in MB is " + + sortTempFile.length() * 0.1 * 10 / 1024 / 1024); } catch (Throwable e) { try { threadStatusObserver.notifyFailed(e); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java index d726539..7e221a7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java @@ -21,6 +21,7 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Comparator; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,14 +31,11 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.core.util.NonDictionaryUtil; +import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; +import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> { @@ -71,20 +69,13 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold /** * return row */ - private Object[] returnRow; - private int dimCnt; - private int noDictDimCnt; - private int complexCnt; - private int measureCnt; - private boolean[] isNoDictionaryDimensionColumn; - private boolean[] isNoDictionarySortColumn; - private DataType[] measureDataTypes; + private IntermediateSortTempRow returnRow; private int readBufferSize; private String compressorName; - private Object[][] currentBuffer; + private IntermediateSortTempRow[] currentBuffer; - private Object[][] backupBuffer; + private IntermediateSortTempRow[] backupBuffer; private boolean isBackupFilled; @@ -104,7 +95,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold * totalRecordFetch */ private int totalRecordFetch; - + private TableFieldStat tableFieldStat; + private SortStepRowHandler sortStepRowHandler; + private Comparator<IntermediateSortTempRow> comparator; /** * Constructor to initialize * @@ -115,16 +108,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName) { // set temp file this.tempFile = tempFile; - this.dimCnt = sortParameters.getDimColCount(); - this.noDictDimCnt = sortParameters.getNoDictionaryCount(); - this.complexCnt = sortParameters.getComplexDimColCount(); - this.measureCnt = sortParameters.getMeasureColCount(); - this.isNoDictionaryDimensionColumn = sortParameters.getNoDictionaryDimnesionColumn(); - this.isNoDictionarySortColumn = sortParameters.getNoDictionarySortColumn(); - this.measureDataTypes = sortParameters.getMeasureDataType(); this.readBufferSize = sortParameters.getBufferSize(); this.compressorName = sortParameters.getSortTempCompressorName(); - + this.tableFieldStat = new TableFieldStat(sortParameters); + this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); + this.comparator = new IntermediateSortTempRowComparator( + tableFieldStat.getIsSortColNoDictFlags()); this.executorService = Executors .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName)); } @@ -178,7 +167,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold if (prefetch) { fillDataForPrefetch(); } else { - this.returnRow = getRowFromStream(); + try { + this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream); + this.numberOfObjectRead++; + } catch (IOException e) { + throw new CarbonSortKeyAndGroupByException("Problem while reading rows", e); + } } } @@ -212,86 +206,28 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold } /** - * Reads row from file + * Read a batch of row from stream + * * @return Object[] - * @throws CarbonSortKeyAndGroupByException + * @throws IOException if error occurs while reading from stream */ - private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException { - // create new row of size 3 (1 for dims , 1 for high card , 1 for measures) - - Object[] holder = new Object[3]; - int index = 0; - int nonDicIndex = 0; - int[] dim = new int[dimCnt - noDictDimCnt]; - byte[][] nonDicArray = new byte[noDictDimCnt + complexCnt][]; - Object[] measures = new Object[measureCnt]; - try { - // read dimension values - for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) { - if (isNoDictionaryDimensionColumn[i]) { - short len = stream.readShort(); - byte[] array = new byte[len]; - stream.readFully(array); - nonDicArray[nonDicIndex++] = array; - } else { - dim[index++] = stream.readInt(); - } - } - - for (int i = 0; i < complexCnt; i++) { - short len = stream.readShort(); - byte[] array = new byte[len]; - stream.readFully(array); - nonDicArray[nonDicIndex++] = array; - } - - index = 0; - // read measure values - for (int i = 0; i < measureCnt; i++) { - if (stream.readByte() == 1) { - DataType dataType = measureDataTypes[i]; - if (dataType == DataTypes.BOOLEAN) { - measures[index++] = stream.readBoolean(); - } else if (dataType == DataTypes.SHORT) { - measures[index++] = stream.readShort(); - } else if (dataType == DataTypes.INT) { - measures[index++] = stream.readInt(); - } else if (dataType == DataTypes.LONG) { - measures[index++] = stream.readLong(); - } else if (dataType == DataTypes.DOUBLE) { - measures[index++] = stream.readDouble(); - } else if (DataTypes.isDecimal(dataType)) { - int len = stream.readInt(); - byte[] buff = new byte[len]; - stream.readFully(buff); - measures[index++] = DataTypeUtil.byteToBigDecimal(buff); - } else { - throw new IllegalArgumentException("unsupported data type:" + dataType); - } - } else { - measures[index++] = null; - } - } - - NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures); - - // increment number if record read - this.numberOfObjectRead++; - } catch (IOException e) { - LOGGER.error("Problme while reading the madkey fom sort temp file"); - throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e); + private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) throws IOException { + IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected]; + for (int i = 0; i < expected; i++) { + IntermediateSortTempRow holder + = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream); + holders[i] = holder; } - - //return out row - return holder; + this.numberOfObjectRead += expected; + return holders; } /** - * below method will be used to get the row + * below method will be used to get the sort temp row * * @return row */ - public Object[] getRow() { + public IntermediateSortTempRow getRow() { return this.returnRow; } @@ -330,31 +266,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold } @Override public int compareTo(SortTempFileChunkHolder other) { - int diff = 0; - int index = 0; - int noDictionaryIndex = 0; - int[] leftMdkArray = (int[]) returnRow[0]; - int[] rightMdkArray = (int[]) other.returnRow[0]; - byte[][] leftNonDictArray = (byte[][]) returnRow[1]; - byte[][] rightNonDictArray = (byte[][]) other.returnRow[1]; - for (boolean isNoDictionary : isNoDictionarySortColumn) { - if (isNoDictionary) { - diff = UnsafeComparer.INSTANCE - .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]); - if (diff != 0) { - return diff; - } - noDictionaryIndex++; - } else { - diff = leftMdkArray[index] - rightMdkArray[index]; - if (diff != 0) { - return diff; - } - index++; - } - - } - return diff; + return comparator.compare(returnRow, other.getRow()); } @Override public boolean equals(Object obj) { @@ -372,9 +284,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold @Override public int hashCode() { int hash = 0; - hash += 31 * measureCnt; - hash += 31 * dimCnt; - hash += 31 * complexCnt; + hash += tableFieldStat.hashCode(); hash += tempFile.hashCode(); return hash; } @@ -414,16 +324,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold /** * This method will read the records from sort temp file and keep it in a buffer * - * @param numberOfRecords - * @return - * @throws CarbonSortKeyAndGroupByException + * @param numberOfRecords number of records to be read + * @return batch of intermediate sort temp row + * @throws IOException if error occurs while reading reading records */ - private Object[][] prefetchRecordsFromFile(int numberOfRecords) - throws CarbonSortKeyAndGroupByException { - Object[][] records = new Object[numberOfRecords][]; - for (int i = 0; i < numberOfRecords; i++) { - records[i] = getRowFromStream(); - } - return records; + private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords) + throws IOException { + return readBatchedRowFromStream(numberOfRecords); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9d46b18/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java new file mode 100644 index 0000000..0d1303a --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.sort.sortdata; + +import java.io.Serializable; +import java.util.Objects; + +import org.apache.carbondata.core.metadata.datatype.DataType; + +/** + * This class is used to hold field information for a table during data loading. These information + * will be used to convert/construct/destruct row in sort process step. Because complex field is + * processed the same as no-dict-no-sort-simple-dimension, so we treat them as the same and use + * `no-dict-no-sort-dim` related variable to represent them in this class. + */ +public class TableFieldStat implements Serializable { + private static final long serialVersionUID = 201712070950L; + private int dictSortDimCnt = 0; + private int dictNoSortDimCnt = 0; + private int noDictSortDimCnt = 0; + private int noDictNoSortDimCnt = 0; + // whether sort column is of dictionary type or not + private boolean[] isSortColNoDictFlags; + private int measureCnt; + private DataType[] measureDataType; + + // indices for dict & sort dimension columns + private int[] dictSortDimIdx; + // indices for dict & no-sort dimension columns + private int[] dictNoSortDimIdx; + // indices for no-dict & sort dimension columns + private int[] noDictSortDimIdx; + // indices for no-dict & no-sort dimension columns, including complex columns + private int[] noDictNoSortDimIdx; + // indices for measure columns + private int[] measureIdx; + + public TableFieldStat(SortParameters sortParameters) { + int noDictDimCnt = sortParameters.getNoDictionaryCount(); + int complexDimCnt = sortParameters.getComplexDimColCount(); + int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt; + this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn(); + int sortColCnt = isSortColNoDictFlags.length; + for (boolean flag : isSortColNoDictFlags) { + if (flag) { + noDictSortDimCnt++; + } else { + dictSortDimCnt++; + } + } + this.measureCnt = sortParameters.getMeasureColCount(); + this.measureDataType = sortParameters.getMeasureDataType(); + + // be careful that the default value is 0 + this.dictSortDimIdx = new int[dictSortDimCnt]; + this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt]; + this.noDictSortDimIdx = new int[noDictSortDimCnt]; + this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt]; + this.measureIdx = new int[measureCnt]; + + int tmpNoDictSortCnt = 0; + int tmpNoDictNoSortCnt = 0; + int tmpDictSortCnt = 0; + int tmpDictNoSortCnt = 0; + boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn(); + + for (int i = 0; i < isDimNoDictFlags.length; i++) { + if (isDimNoDictFlags[i]) { + if (i < sortColCnt && isSortColNoDictFlags[i]) { + noDictSortDimIdx[tmpNoDictSortCnt++] = i; + } else { + noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i; + } + } else { + if (i < sortColCnt && !isSortColNoDictFlags[i]) { + dictSortDimIdx[tmpDictSortCnt++] = i; + } else { + dictNoSortDimIdx[tmpDictNoSortCnt++] = i; + } + } + } + dictNoSortDimCnt = tmpDictNoSortCnt; + + int base = isDimNoDictFlags.length; + // adding complex dimension columns + for (int i = 0; i < complexDimCnt; i++) { + noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = base + i; + } + noDictNoSortDimCnt = tmpNoDictNoSortCnt; + + base += complexDimCnt; + // indices for measure columns + for (int i = 0; i < measureCnt; i++) { + measureIdx[i] = base + i; + } + } + + public int getDictSortDimCnt() { + return dictSortDimCnt; + } + + public int getDictNoSortDimCnt() { + return dictNoSortDimCnt; + } + + public int getNoDictSortDimCnt() { + return noDictSortDimCnt; + } + + public int getNoDictNoSortDimCnt() { + return noDictNoSortDimCnt; + } + + public boolean[] getIsSortColNoDictFlags() { + return isSortColNoDictFlags; + } + + public int getMeasureCnt() { + return measureCnt; + } + + public DataType[] getMeasureDataType() { + return measureDataType; + } + + public int[] getDictSortDimIdx() { + return dictSortDimIdx; + } + + public int[] getDictNoSortDimIdx() { + return dictNoSortDimIdx; + } + + public int[] getNoDictSortDimIdx() { + return noDictSortDimIdx; + } + + public int[] getNoDictNoSortDimIdx() { + return noDictNoSortDimIdx; + } + + public int[] getMeasureIdx() { + return measureIdx; + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TableFieldStat)) return false; + TableFieldStat that = (TableFieldStat) o; + return dictSortDimCnt == that.dictSortDimCnt + && dictNoSortDimCnt == that.dictNoSortDimCnt + && noDictSortDimCnt == that.noDictSortDimCnt + && noDictNoSortDimCnt == that.noDictNoSortDimCnt + && measureCnt == that.measureCnt; + } + + @Override public int hashCode() { + return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt, + noDictNoSortDimCnt, measureCnt); + } +} \ No newline at end of file