http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java new file mode 100644 index 0000000..d512349 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.newflow.sort.unsafe.holder; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator; + +public class UnsafeInmemoryHolder implements SortTempChunkHolder { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeInmemoryHolder.class.getName()); + + private int counter; + + private int actualSize; + + private UnsafeCarbonRowPage rowPage; + + private Object[] currentRow; + + private long address; + + private NewRowComparator comparator; + + private int columnSize; + + public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize) { + this.actualSize = rowPage.getBuffer().getActualSize(); + this.rowPage = rowPage; + LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize); + this.comparator = new NewRowComparator(rowPage.getNoDictionaryDimensionMapping()); + this.columnSize = columnSize; + } + + public boolean hasNext() { + if (counter < actualSize) { + return true; + } + return false; + } + + public void readRow() { + currentRow = new Object[columnSize]; + address = rowPage.getBuffer().get(counter); + rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow); + counter++; + } + + public Object[] getRow() { + return currentRow; + } + + @Override public int compareTo(SortTempChunkHolder o) { + return comparator.compare(currentRow, o.getRow()); + } + + public int numberOfRows() { + return actualSize; + } + + public void close() { + rowPage.freeMemory(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java new file mode 100644 index 0000000..9f157a0 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.sort.unsafe.holder; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator; + +/** + * It is used for merging unsafe inmemory intermediate data + */ +public class UnsafeInmemoryMergeHolder implements Comparable<UnsafeInmemoryMergeHolder> { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeInmemoryMergeHolder.class.getName()); + + private int counter; + + private int actualSize; + + private UnsafeCarbonRowPage rowPage; + + private UnsafeCarbonRowForMerge currentRow; + + private long address; + + private UnsafeRowComparator comparator; + + private Object baseObject; + + public UnsafeInmemoryMergeHolder(UnsafeCarbonRowPage rowPage, byte index) { + this.actualSize = rowPage.getBuffer().getActualSize(); + this.rowPage = rowPage; + LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize); + this.comparator = new UnsafeRowComparator(rowPage); + this.baseObject = rowPage.getDataBlock().getBaseObject(); + currentRow = new UnsafeCarbonRowForMerge(); + currentRow.index = index; + } + + public boolean hasNext() { + if (counter < actualSize) { + return true; + } + return false; + } + + public void readRow() { + address = rowPage.getBuffer().get(counter); + currentRow.address = address + rowPage.getDataBlock().getBaseOffset(); + counter++; + } + + public UnsafeCarbonRowForMerge getRow() { + return currentRow; + } + + @Override public int compareTo(UnsafeInmemoryMergeHolder o) { + return comparator.compare(currentRow, baseObject, o.getRow(), o.getBaseObject()); + } + + public int numberOfRows() { + return actualSize; + } + + public Object getBaseObject() { + return baseObject; + } + + public void close() { + rowPage.freeMemory(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java new file mode 100644 index 0000000..30ef9ee --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.sort.unsafe.holder; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Comparator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; +import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; + +public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeSortTempFileChunkHolder.class.getName()); + + /** + * temp file + */ + private File tempFile; + + /** + * read stream + */ + private DataInputStream stream; + + /** + * entry count + */ + private int entryCount; + + /** + * return row + */ + private Object[] returnRow; + + /** + * number of measures + */ + private int measureCount; + + /** + * number of dimensionCount + */ + private int dimensionCount; + + /** + * number of complexDimensionCount + */ + private int complexDimensionCount; + + /** + * fileBufferSize for file reader stream size + */ + private int fileBufferSize; + + private Object[][] currentBuffer; + + private Object[][] backupBuffer; + + private boolean isBackupFilled; + + private boolean prefetch; + + private int bufferSize; + + private int bufferRowCounter; + + private ExecutorService executorService; + + private Future<Void> submit; + + private int prefetchRecordsProceesed; + + /** + * sortTempFileNoOFRecordsInCompression + */ + private int sortTempFileNoOFRecordsInCompression; + + /** + * isSortTempFileCompressionEnabled + */ + private boolean isSortTempFileCompressionEnabled; + + /** + * totalRecordFetch + */ + private int totalRecordFetch; + + private int noDictionaryCount; + + private char[] aggType; + + private int numberOfObjectRead; + /** + * to store whether dimension is of dictionary type or not + */ + private boolean[] isNoDictionaryDimensionColumn; + + private int nullSetWordsLength; + + private Comparator<Object[]> comparator; + + /** + * Constructor to initialize + */ + public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) { + // set temp file + this.tempFile = tempFile; + + // set measure and dimension count + this.measureCount = parameters.getMeasureColCount(); + this.dimensionCount = parameters.getDimColCount(); + this.complexDimensionCount = parameters.getComplexDimColCount(); + + this.noDictionaryCount = parameters.getNoDictionaryCount(); + // set mdkey length + this.fileBufferSize = parameters.getFileBufferSize(); + this.executorService = Executors.newFixedThreadPool(1); + this.aggType = parameters.getAggType(); + this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); + this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1; + comparator = new NewRowComparator(isNoDictionaryDimensionColumn); + initialize(); + } + + /** + * This method will be used to initialize + * + * @throws CarbonSortKeyAndGroupByException problem while initializing + */ + public void initialize() { + prefetch = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH, + CarbonCommonConstants.CARBON_MERGE_SORT_PREFETCH_DEFAULT)); + bufferSize = CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE; + this.isSortTempFileCompressionEnabled = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED, + CarbonCommonConstants.IS_SORT_TEMP_FILE_COMPRESSION_ENABLED_DEFAULTVALUE)); + if (this.isSortTempFileCompressionEnabled) { + LOGGER.info("Compression was used while writing the sortTempFile"); + } + + try { + this.sortTempFileNoOFRecordsInCompression = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION, + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE)); + if (this.sortTempFileNoOFRecordsInCompression < 1) { + LOGGER.error("Invalid value for: " + + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION + + ": Only Positive Integer value(greater than zero) is allowed.Default value will" + + " be used"); + + this.sortTempFileNoOFRecordsInCompression = Integer.parseInt( + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); + } + } catch (NumberFormatException e) { + LOGGER.error( + "Invalid value for: " + CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION + + ", only Positive Integer value is allowed.Default value will be used"); + this.sortTempFileNoOFRecordsInCompression = Integer + .parseInt(CarbonCommonConstants.SORT_TEMP_FILE_NO_OF_RECORD_FOR_COMPRESSION_DEFAULTVALUE); + } + + initialise(); + } + + private void initialise() { + try { + if (isSortTempFileCompressionEnabled) { + this.bufferSize = sortTempFileNoOFRecordsInCompression; + } + stream = new DataInputStream( + new BufferedInputStream(new FileInputStream(tempFile), this.fileBufferSize)); + this.entryCount = stream.readInt(); + LOGGER.audit("Processing unsafe mode file rows with size : " + entryCount); + if (prefetch) { + new DataFetcher(false).call(); + totalRecordFetch += currentBuffer.length; + if (totalRecordFetch < this.entryCount) { + submit = executorService.submit(new DataFetcher(true)); + } + } else { + if (isSortTempFileCompressionEnabled) { + new DataFetcher(false).call(); + } + } + + } catch (FileNotFoundException e) { + LOGGER.error(e); + throw new RuntimeException(tempFile + " No Found", e); + } catch (IOException e) { + LOGGER.error(e); + throw new RuntimeException(tempFile + " No Found", e); + } catch (Exception e) { + LOGGER.error(e); + throw new RuntimeException(tempFile + " Problem while reading", e); + } + } + + /** + * This method will be used to read new row from file + * + * @throws CarbonSortKeyAndGroupByException problem while reading + */ + public void readRow() throws CarbonSortKeyAndGroupByException { + if (prefetch) { + fillDataForPrefetch(); + } else if (isSortTempFileCompressionEnabled) { + if (bufferRowCounter >= bufferSize) { + try { + new DataFetcher(false).call(); + bufferRowCounter = 0; + } catch (Exception e) { + LOGGER.error(e); + throw new CarbonSortKeyAndGroupByException(tempFile + " Problem while reading", e); + } + + } + prefetchRecordsProceesed++; + returnRow = currentBuffer[bufferRowCounter++]; + } else { + Object[] outRow = getRowFromStream(); + this.returnRow = outRow; + } + } + + private void fillDataForPrefetch() { + if (bufferRowCounter >= bufferSize) { + if (isBackupFilled) { + bufferRowCounter = 0; + currentBuffer = backupBuffer; + totalRecordFetch += currentBuffer.length; + isBackupFilled = false; + if (totalRecordFetch < this.entryCount) { + submit = executorService.submit(new DataFetcher(true)); + } + } else { + try { + submit.get(); + } catch (Exception e) { + LOGGER.error(e); + } + bufferRowCounter = 0; + currentBuffer = backupBuffer; + isBackupFilled = false; + totalRecordFetch += currentBuffer.length; + if (totalRecordFetch < this.entryCount) { + submit = executorService.submit(new DataFetcher(true)); + } + } + } + prefetchRecordsProceesed++; + returnRow = currentBuffer[bufferRowCounter++]; + } + + /** + * @return + * @throws CarbonSortKeyAndGroupByException + */ + private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException { + Object[] row = new Object[dimensionCount + measureCount]; + try { + int dimCount = 0; + for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { + if (isNoDictionaryDimensionColumn[dimCount]) { + short aShort = stream.readShort(); + byte[] col = new byte[aShort]; + stream.readFully(col); + row[dimCount] = col; + } else { + int anInt = stream.readInt(); + row[dimCount] = anInt; + } + } + + // write complex dimensions here. + for (; dimCount < dimensionCount; dimCount++) { + short aShort = stream.readShort(); + byte[] col = new byte[aShort]; + stream.readFully(col); + row[dimCount] = col; + } + + long[] words = new long[nullSetWordsLength]; + for (int i = 0; i < words.length; i++) { + words[i] = stream.readLong(); + } + + for (int mesCount = 0; mesCount < measureCount; mesCount++) { + if (UnsafeCarbonRowPage.isSet(words, mesCount)) { + if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + row[dimensionCount + mesCount] = stream.readDouble(); + } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) { + row[dimensionCount + mesCount] = stream.readLong(); + } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { + short aShort = stream.readShort(); + byte[] bigDecimalInBytes = new byte[aShort]; + stream.readFully(bigDecimalInBytes); + row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); + } + } + } + return row; + } catch (Exception e) { + throw new CarbonSortKeyAndGroupByException(e); + } + } + + /** + * below method will be used to get the row + * + * @return row + */ + public Object[] getRow() { + return this.returnRow; + } + + /** + * below method will be used to check whether any more records are present + * in file or not + * + * @return more row present in file + */ + public boolean hasNext() { + if (prefetch || isSortTempFileCompressionEnabled) { + return this.prefetchRecordsProceesed < this.entryCount; + } + return this.numberOfObjectRead < this.entryCount; + } + + /** + * Below method will be used to close streams + */ + public void close() { + CarbonUtil.closeStreams(stream); + executorService.shutdown(); + } + + /** + * This method will number of entries + * + * @return entryCount + */ + public int numberOfRows() { + return entryCount; + } + + @Override public int compareTo(SortTempChunkHolder other) { + return comparator.compare(returnRow, other.getRow()); + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof UnsafeSortTempFileChunkHolder)) { + return false; + } + UnsafeSortTempFileChunkHolder o = (UnsafeSortTempFileChunkHolder) obj; + + return o.compareTo(o) == 0; + } + + @Override public int hashCode() { + int hash = 0; + hash += 31 * measureCount; + hash += 31 * dimensionCount; + hash += 31 * complexDimensionCount; + hash += 31 * noDictionaryCount; + hash += tempFile.hashCode(); + return hash; + } + + private final class DataFetcher implements Callable<Void> { + private boolean isBackUpFilling; + + private int numberOfRecords; + + private DataFetcher(boolean backUp) { + isBackUpFilling = backUp; + calculateNumberOfRecordsToBeFetched(); + } + + private void calculateNumberOfRecordsToBeFetched() { + int numberOfRecordsLeftToBeRead = entryCount - totalRecordFetch; + numberOfRecords = + bufferSize < numberOfRecordsLeftToBeRead ? bufferSize : numberOfRecordsLeftToBeRead; + } + + @Override public Void call() throws Exception { + try { + if (isBackUpFilling) { + backupBuffer = prefetchRecordsFromFile(numberOfRecords); + isBackupFilled = true; + } else { + currentBuffer = prefetchRecordsFromFile(numberOfRecords); + } + } catch (Exception e) { + LOGGER.error(e); + } + return null; + } + + } + + /** + * This method will read the records from sort temp file and keep it in a buffer + * + * @param numberOfRecords + * @return + * @throws CarbonSortKeyAndGroupByException + */ + private Object[][] prefetchRecordsFromFile(int numberOfRecords) + throws CarbonSortKeyAndGroupByException { + Object[][] records = new Object[numberOfRecords][]; + for (int i = 0; i < numberOfRecords; i++) { + records[i] = getRowFromStream(); + } + return records; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java new file mode 100644 index 0000000..0d36d90 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.sort.unsafe.merger; + +import java.util.AbstractQueue; +import java.util.PriorityQueue; +import java.util.concurrent.Callable; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRowForMerge; +import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryMergeHolder; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; + +public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeInMemoryIntermediateDataMerger.class.getName()); + + /** + * recordHolderHeap + */ + private AbstractQueue<UnsafeInmemoryMergeHolder> recordHolderHeap; + + /** + * fileCounter + */ + private int holderCounter; + + /** + * entryCount + */ + private int entryCount; + + private UnsafeCarbonRowPage[] unsafeCarbonRowPages; + + private long[] mergedAddresses; + + private byte[] rowPageIndexes; + + /** + * IntermediateFileMerger Constructor + */ + public UnsafeInMemoryIntermediateDataMerger(UnsafeCarbonRowPage[] unsafeCarbonRowPages, + int totalSize) { + this.holderCounter = unsafeCarbonRowPages.length; + this.unsafeCarbonRowPages = unsafeCarbonRowPages; + this.mergedAddresses = new long[totalSize]; + this.rowPageIndexes = new byte[totalSize]; + this.entryCount = 0; + } + + @Override public Void call() throws Exception { + long intermediateMergeStartTime = System.currentTimeMillis(); + int holderCounterConst = holderCounter; + boolean isFailed = false; + try { + startSorting(); + while (hasNext()) { + writeDataToMemory(next()); + } + double intermediateMergeCostTime = + (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0; + LOGGER.info("============================== Intermediate Merge of " + holderCounterConst + + " in-memory sort Cost Time: " + intermediateMergeCostTime + "(s)"); + } catch (Exception e) { + LOGGER.error(e, "Problem while intermediate merging"); + } + return null; + } + + /** + * This method will be used to get the sorted record from file + * + * @return sorted record sorted record + * @throws CarbonSortKeyAndGroupByException + */ + private UnsafeCarbonRowForMerge getSortedRecordFromMemory() + throws CarbonSortKeyAndGroupByException { + UnsafeCarbonRowForMerge row = null; + + // poll the top object from heap + // heap maintains binary tree which is based on heap condition that will + // be based on comparator we are passing the heap + // when will call poll it will always delete root of the tree and then + // it does trickel down operation complexity is log(n) + UnsafeInmemoryMergeHolder poll = this.recordHolderHeap.poll(); + + // get the row from chunk + row = poll.getRow(); + + // check if there no entry present + if (!poll.hasNext()) { + // change the file counter + --this.holderCounter; + + // reaturn row + return row; + } + + // read new row + poll.readRow(); + + // add to heap + this.recordHolderHeap.add(poll); + + // return row + return row; + } + + /** + * Below method will be used to start storing process This method will get + * all the temp files present in sort temp folder then it will create the + * record holder heap and then it will read first record from each file and + * initialize the heap + * + * @throws CarbonSortKeyAndGroupByException + */ + private void startSorting() throws CarbonSortKeyAndGroupByException { + LOGGER.info("Number of row pages in intermediate merger: " + this.holderCounter); + + // create record holder heap + createRecordHolderQueue(unsafeCarbonRowPages); + + // iterate over file list and create chunk holder and add to heap + LOGGER.info("Started adding first record from row page"); + + UnsafeInmemoryMergeHolder unsafePageHolder = null; + byte index = 0; + for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) { + // create chunk holder + unsafePageHolder = new UnsafeInmemoryMergeHolder(unsafeCarbonRowPage, index++); + + // initialize + unsafePageHolder.readRow(); + + // add to heap + this.recordHolderHeap.add(unsafePageHolder); + } + + LOGGER.info("Heap Size" + this.recordHolderHeap.size()); + } + + /** + * This method will be used to create the heap which will be used to hold + * the chunk of data + */ + private void createRecordHolderQueue(UnsafeCarbonRowPage[] pages) { + // creating record holder heap + this.recordHolderHeap = new PriorityQueue<UnsafeInmemoryMergeHolder>(pages.length); + } + + /** + * This method will be used to get the sorted row + * + * @return sorted row + * @throws CarbonSortKeyAndGroupByException + */ + private UnsafeCarbonRowForMerge next() throws CarbonSortKeyAndGroupByException { + return getSortedRecordFromMemory(); + } + + /** + * This method will be used to check whether any more element is present or + * not + * + * @return more element is present + */ + private boolean hasNext() { + return this.holderCounter > 0; + } + + /** + * Below method will be used to write data to file + */ + private void writeDataToMemory(UnsafeCarbonRowForMerge row) { + mergedAddresses[entryCount] = row.address; + rowPageIndexes[entryCount] = row.index; + entryCount++; + } + + public int getEntryCount() { + return entryCount; + } + + public UnsafeCarbonRowPage[] getUnsafeCarbonRowPages() { + return unsafeCarbonRowPages; + } + + public long[] getMergedAddresses() { + return mergedAddresses; + } + + public byte[] getRowPageIndexes() { + return rowPageIndexes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java new file mode 100644 index 0000000..735243e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.sort.unsafe.merger; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.AbstractQueue; +import java.util.Arrays; +import java.util.PriorityQueue; +import java.util.concurrent.Callable; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.CarbonUtilException; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder; +import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriter; +import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriterFactory; + +public class UnsafeIntermediateFileMerger implements Callable<Void> { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeIntermediateFileMerger.class.getName()); + + /** + * recordHolderHeap + */ + private AbstractQueue<SortTempChunkHolder> recordHolderHeap; + + /** + * fileCounter + */ + private int fileCounter; + + /** + * stream + */ + private DataOutputStream stream; + + /** + * totalNumberOfRecords + */ + private int totalNumberOfRecords; + + /** + * writer + */ + private TempSortFileWriter writer; + + private SortParameters mergerParameters; + + private File[] intermediateFiles; + + private File outPutFile; + + private boolean[] noDictionarycolumnMapping; + + private long[] nullSetWords; + + private ByteBuffer rowData; + + /** + * IntermediateFileMerger Constructor + */ + public UnsafeIntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles, + File outPutFile) { + this.mergerParameters = mergerParameters; + this.fileCounter = intermediateFiles.length; + this.intermediateFiles = intermediateFiles; + this.outPutFile = outPutFile; + noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn(); + this.nullSetWords = new long[((mergerParameters.getMeasureColCount() - 1) >> 6) + 1]; + // Take size of 2 MB for each row. I think it is high enough to use + rowData = ByteBuffer.allocate(2*1024*1024); + } + + @Override public Void call() throws Exception { + long intermediateMergeStartTime = System.currentTimeMillis(); + int fileConterConst = fileCounter; + boolean isFailed = false; + try { + startSorting(); + initialize(); + while (hasNext()) { + writeDataTofile(next()); + } + double intermediateMergeCostTime = + (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0; + LOGGER.info("============================== Intermediate Merge of " + fileConterConst + + " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)"); + } catch (Exception e) { + LOGGER.error(e, "Problem while intermediate merging"); + isFailed = true; + } finally { + CarbonUtil.closeStreams(this.stream); + if (null != writer) { + writer.finish(); + } + if (!isFailed) { + try { + finish(); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e, "Problem while deleting the merge file"); + } + } else { + if (outPutFile.delete()) { + LOGGER.error("Problem while deleting the merge file"); + } + } + } + + return null; + } + + /** + * This method is responsible for initializing the out stream + * + * @throws CarbonSortKeyAndGroupByException + */ + private void initialize() throws CarbonSortKeyAndGroupByException { + if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) { + try { + this.stream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(outPutFile), + mergerParameters.getFileWriteBufferSize())); + this.stream.writeInt(this.totalNumberOfRecords); + } catch (FileNotFoundException e) { + throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e); + } catch (IOException e) { + throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e); + } + } else { + writer = TempSortFileWriterFactory.getInstance() + .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(), + mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(), + mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(), + mergerParameters.getFileWriteBufferSize()); + writer.initiaize(outPutFile, totalNumberOfRecords); + } + } + + /** + * This method will be used to get the sorted record from file + * + * @return sorted record sorted record + * @throws CarbonSortKeyAndGroupByException + */ + private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException { + Object[] row = null; + + // poll the top object from heap + // heap maintains binary tree which is based on heap condition that will + // be based on comparator we are passing the heap + // when will call poll it will always delete root of the tree and then + // it does trickel down operation complexity is log(n) + SortTempChunkHolder poll = this.recordHolderHeap.poll(); + + // get the row from chunk + row = poll.getRow(); + + // check if there no entry present + if (!poll.hasNext()) { + // if chunk is empty then close the stream + poll.close(); + + // change the file counter + --this.fileCounter; + + // reaturn row + return row; + } + + // read new row + poll.readRow(); + + // add to heap + this.recordHolderHeap.add(poll); + + // return row + return row; + } + + /** + * Below method will be used to start storing process This method will get + * all the temp files present in sort temp folder then it will create the + * record holder heap and then it will read first record from each file and + * initialize the heap + * + * @throws CarbonSortKeyAndGroupByException + */ + private void startSorting() throws CarbonSortKeyAndGroupByException { + LOGGER.info("Number of temp file: " + this.fileCounter); + + // create record holder heap + createRecordHolderQueue(intermediateFiles); + + // iterate over file list and create chunk holder and add to heap + LOGGER.info("Started adding first record from each file"); + + SortTempChunkHolder sortTempFileChunkHolder = null; + + for (File tempFile : intermediateFiles) { + // create chunk holder + sortTempFileChunkHolder = new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters); + + sortTempFileChunkHolder.readRow(); + this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows(); + + // add to heap + this.recordHolderHeap.add(sortTempFileChunkHolder); + } + + LOGGER.info("Heap Size" + this.recordHolderHeap.size()); + } + + /** + * This method will be used to create the heap which will be used to hold + * the chunk of data + * + * @param listFiles list of temp files + */ + private void createRecordHolderQueue(File[] listFiles) { + // creating record holder heap + this.recordHolderHeap = new PriorityQueue<SortTempChunkHolder>(listFiles.length); + } + + /** + * This method will be used to get the sorted row + * + * @return sorted row + * @throws CarbonSortKeyAndGroupByException + */ + private Object[] next() throws CarbonSortKeyAndGroupByException { + return getSortedRecordFromFile(); + } + + /** + * This method will be used to check whether any more element is present or + * not + * + * @return more element is present + */ + private boolean hasNext() { + return this.fileCounter > 0; + } + + /** + * Below method will be used to write data to file + * + * @throws CarbonSortKeyAndGroupByException problem while writing + */ + private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException { + int dimCount = 0; + int size = 0; + char[] aggType = mergerParameters.getAggType(); + for (; dimCount < noDictionarycolumnMapping.length; dimCount++) { + if (noDictionarycolumnMapping[dimCount]) { + byte[] col = (byte[]) row[dimCount]; + rowData.putShort((short) col.length); + size += 2; + rowData.put(col); + size += col.length; + } else { + rowData.putInt((int) row[dimCount]); + size += 4; + } + } + + // write complex dimensions here. + int dimensionSize = mergerParameters.getDimColCount(); + int measureSize = mergerParameters.getMeasureColCount(); + for (; dimCount < dimensionSize; dimCount++) { + byte[] col = (byte[]) row[dimCount]; + rowData.putShort((short)col.length); + size += 2; + rowData.put(col); + size += col.length; + } + Arrays.fill(nullSetWords, 0); + int nullSetSize = nullSetWords.length * 8; + int nullLoc = size; + size += nullSetSize; + for (int mesCount = 0; mesCount < measureSize; mesCount++) { + Object value = row[mesCount + dimensionSize]; + if (null != value) { + if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + Double val = (Double) value; + rowData.putDouble(size, val); + size += 8; + } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) { + Long val = (Long) value; + rowData.putLong(size, val); + size += 8; + } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { + BigDecimal val = (BigDecimal) value; + byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val); + rowData.putShort(size, (short)bigDecimalInBytes.length); + size += 2; + for (int i = 0; i < bigDecimalInBytes.length; i++) { + rowData.put(size++, bigDecimalInBytes[i]); + } + } + UnsafeCarbonRowPage.set(nullSetWords, mesCount); + } else { + UnsafeCarbonRowPage.unset(nullSetWords, mesCount); + } + } + for (int i = 0; i < nullSetWords.length; i++) { + rowData.putLong(nullLoc, nullSetWords[i]); + nullLoc += 8; + } + byte[] rowBytes = new byte[size]; + rowData.position(0); + rowData.get(rowBytes); + stream.write(rowBytes); + rowData.clear(); + } + + private void finish() throws CarbonSortKeyAndGroupByException { + if (recordHolderHeap != null) { + int size = recordHolderHeap.size(); + for (int i = 0; i < size; i++) { + recordHolderHeap.poll().close(); + } + } + try { + CarbonUtil.deleteFiles(intermediateFiles); + rowData.clear(); + } catch (CarbonUtilException e) { + throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java new file mode 100644 index 0000000..1cb2336 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.sort.unsafe.merger; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; + +/** + * It does mergesort intermediate files to big file. + */ +public class UnsafeIntermediateMerger { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeIntermediateMerger.class.getName()); + + /** + * executorService + */ + private ExecutorService executorService; + /** + * rowPages + */ + private List<UnsafeCarbonRowPage> rowPages; + + private List<UnsafeInMemoryIntermediateDataMerger> mergedPages; + + private SortParameters parameters; + + private final Object lockObject = new Object(); + + private boolean offHeap; + + private List<File> procFiles; + + public UnsafeIntermediateMerger(SortParameters parameters) { + this.parameters = parameters; + // processed file list + this.rowPages = new ArrayList<UnsafeCarbonRowPage>(CarbonCommonConstants.CONSTANT_SIZE_TEN); + this.mergedPages = new ArrayList<>(); + this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores()); + this.offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, + CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)); + this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN); + } + + public void addDataChunkToMerge(UnsafeCarbonRowPage rowPage) { + // add sort temp filename to and arrayList. When the list size reaches 20 then + // intermediate merging of sort temp files will be triggered + synchronized (lockObject) { + rowPages.add(rowPage); + } + } + + public void addFileToMerge(File sortTempFile) { + // add sort temp filename to and arrayList. When the list size reaches 20 then + // intermediate merging of sort temp files will be triggered + synchronized (lockObject) { + procFiles.add(sortTempFile); + } + } + + public void startFileMergingIfPossible() { + File[] fileList; + if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { + synchronized (lockObject) { + fileList = procFiles.toArray(new File[procFiles.size()]); + this.procFiles = new ArrayList<File>(); + } + LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length); + startIntermediateMerging(fileList); + } + } + + /** + * Below method will be used to start the intermediate file merging + * + * @param intermediateFiles + */ + private void startIntermediateMerging(File[] intermediateFiles) { + File file = new File( + parameters.getTempFileLocation() + File.separator + parameters.getTableName() + System + .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); + UnsafeIntermediateFileMerger merger = + new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file); + executorService.submit(merger); + } + + public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException { + UnsafeCarbonRowPage[] localRowPages; + if (rowPages.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) { + int totalRows = 0; + synchronized (lockObject) { + totalRows = getTotalNumberOfRows(rowPages); + if (totalRows <= 0) { + return; + } + localRowPages = rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]); + this.rowPages = new ArrayList<>(); + } + LOGGER.debug("Sumitting request for intermediate merging of in-memory pages : " + + localRowPages.length); + startIntermediateMerging(localRowPages, totalRows); + } + } + + /** + * Below method will be used to start the intermediate file merging + * + * @param rowPages + */ + private void startIntermediateMerging(UnsafeCarbonRowPage[] rowPages, int totalRows) + throws CarbonSortKeyAndGroupByException { + UnsafeInMemoryIntermediateDataMerger merger = + new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows); + mergedPages.add(merger); + executorService.submit(merger); + } + + private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> unsafeCarbonRowPages) { + int totalSize = 0; + for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) { + totalSize += unsafeCarbonRowPage.getBuffer().getActualSize(); + } + return totalSize; + } + + public void finish() throws CarbonSortKeyAndGroupByException { + try { + executorService.shutdown(); + executorService.awaitTermination(2, TimeUnit.DAYS); + } catch (InterruptedException e) { + throw new CarbonSortKeyAndGroupByException("Problem while shutdown the server ", e); + } + } + + public void close() { + if (executorService.isShutdown()) { + executorService.shutdownNow(); + } + rowPages.clear(); + rowPages = null; + } + + public List<UnsafeCarbonRowPage> getRowPages() { + return rowPages; + } + + public List<UnsafeInMemoryIntermediateDataMerger> getMergedPages() { + return mergedPages; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f1f9348d/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java new file mode 100644 index 0000000..a142823 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.sort.unsafe.merger; + +import java.io.File; +import java.io.FileFilter; +import java.util.AbstractQueue; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder; +import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeFinalMergePageHolder; +import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder; +import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; +import org.apache.carbondata.processing.util.RemoveDictionaryUtil; + +public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(UnsafeSingleThreadFinalSortFilesMerger.class.getName()); + + /** + * lockObject + */ + private static final Object LOCKOBJECT = new Object(); + + /** + * fileCounter + */ + private int fileCounter; + + /** + * recordHolderHeap + */ + private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal; + + private SortParameters parameters; + + /** + * number of measures + */ + private int measureCount; + + /** + * number of dimensionCount + */ + private int dimensionCount; + + /** + * number of complexDimensionCount + */ + private int noDictionaryCount; + + private int complexDimensionCount; + + private boolean[] isNoDictionaryDimensionColumn; + + /** + * tempFileLocation + */ + private String tempFileLocation; + + private String tableName; + + public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters) { + this.parameters = parameters; + // set measure and dimension count + this.measureCount = parameters.getMeasureColCount(); + this.dimensionCount = parameters.getDimColCount(); + this.complexDimensionCount = parameters.getComplexDimColCount(); + + this.noDictionaryCount = parameters.getNoDictionaryCount(); + this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn(); + this.tempFileLocation = parameters.getTempFileLocation(); + this.tableName = parameters.getTableName(); + } + + /** + * This method will be used to merger the merged files + * + */ + public void startFinalMerge(UnsafeCarbonRowPage[] rowPages, + List<UnsafeInMemoryIntermediateDataMerger> merges) throws CarbonDataWriterException { + startSorting(rowPages, merges); + } + + /** + * Below method will be used to start storing process This method will get + * all the temp files present in sort temp folder then it will create the + * record holder heap and then it will read first record from each file and + * initialize the heap + * + */ + private void startSorting(UnsafeCarbonRowPage[] rowPages, + List<UnsafeInMemoryIntermediateDataMerger> merges) throws CarbonDataWriterException { + try { + File[] filesToMergeSort = getFilesToMergeSort(); + this.fileCounter = rowPages.length + filesToMergeSort.length + merges.size(); + + LOGGER.info("Number of row pages: " + this.fileCounter); + + // create record holder heap + createRecordHolderQueue(); + + // iterate over file list and create chunk holder and add to heap + LOGGER.info("Started adding first record from each page"); + for (final UnsafeCarbonRowPage rowPage : rowPages) { + + SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage, + parameters.getDimColCount() + parameters.getMeasureColCount()); + + // initialize + sortTempFileChunkHolder.readRow(); + + recordHolderHeapLocal.add(sortTempFileChunkHolder); + } + + for (final UnsafeInMemoryIntermediateDataMerger merger : merges) { + + SortTempChunkHolder sortTempFileChunkHolder = + new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionaryDimnesionColumn(), + parameters.getDimColCount() + parameters.getMeasureColCount()); + + // initialize + sortTempFileChunkHolder.readRow(); + + recordHolderHeapLocal.add(sortTempFileChunkHolder); + } + + for (final File file : filesToMergeSort) { + + SortTempChunkHolder sortTempFileChunkHolder = + new UnsafeSortTempFileChunkHolder(file, parameters); + + // initialize + sortTempFileChunkHolder.readRow(); + + recordHolderHeapLocal.add(sortTempFileChunkHolder); + } + + LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size()); + } catch (Exception e) { + LOGGER.error(e); + throw new CarbonDataWriterException(e.getMessage()); + } + } + + private File[] getFilesToMergeSort() { + // get all the merged files + File file = new File(tempFileLocation); + + File[] fileList = file.listFiles(new FileFilter() { + public boolean accept(File pathname) { + return pathname.getName().startsWith(tableName); + } + }); + + if (null == fileList || fileList.length < 0) { + return new File[0]; + } + return fileList; + } + + /** + * This method will be used to create the heap which will be used to hold + * the chunk of data + */ + private void createRecordHolderQueue() { + // creating record holder heap + this.recordHolderHeapLocal = new PriorityQueue<SortTempChunkHolder>(fileCounter); + } + + /** + * This method will be used to get the sorted row + * + * @return sorted row + */ + public Object[] next() { + return convertRow(getSortedRecordFromFile()); + } + + /** + * This method will be used to get the sorted record from file + * + * @return sorted record sorted record + */ + private Object[] getSortedRecordFromFile() throws CarbonDataWriterException { + Object[] row = null; + + // poll the top object from heap + // heap maintains binary tree which is based on heap condition that will + // be based on comparator we are passing the heap + // when will call poll it will always delete root of the tree and then + // it does trickel down operation complexity is log(n) + SortTempChunkHolder poll = this.recordHolderHeapLocal.poll(); + + // get the row from chunk + row = poll.getRow(); + + // check if there no entry present + if (!poll.hasNext()) { + // if chunk is empty then close the stream + poll.close(); + + // change the file counter + --this.fileCounter; + + // reaturn row + return row; + } + + // read new row + try { + poll.readRow(); + } catch (Exception e) { + throw new CarbonDataWriterException(e.getMessage(), e); + } + + // add to heap + this.recordHolderHeapLocal.add(poll); + + // return row + return row; + } + + /** + * This method will be used to check whether any more element is present or + * not + * + * @return more element is present + */ + public boolean hasNext() { + return this.fileCounter > 0; + } + + private Object[] convertRow(Object[] data) { + // create new row of size 3 (1 for dims , 1 for high card , 1 for measures) + + Object[] holder = new Object[3]; + int index = 0; + int nonDicIndex = 0; + int allCount = 0; + int[] dim = new int[this.dimensionCount]; + byte[][] nonDicArray = new byte[this.noDictionaryCount + this.complexDimensionCount][]; + Object[] measures = new Object[this.measureCount]; + try { + // read dimension values + for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) { + if (isNoDictionaryDimensionColumn[i]) { + nonDicArray[nonDicIndex++] = (byte[]) data[i]; + } else { + dim[index++] = (int) data[allCount]; + } + allCount++; + } + + for (int i = 0; i < complexDimensionCount; i++) { + nonDicArray[nonDicIndex++] = (byte[]) data[allCount]; + allCount++; + } + + index = 0; + // read measure values + for (int i = 0; i < this.measureCount; i++) { + measures[index++] = data[allCount]; + allCount++; + } + + RemoveDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures); + + // increment number if record read + } catch (Exception e) { + throw new RuntimeException("Problem while converting row ", e); + } + + //return out row + return holder; + } + + public void clear() { + if (null != recordHolderHeapLocal) { + for (SortTempChunkHolder pageHolder : recordHolderHeapLocal) { + pageHolder.close(); + } + recordHolderHeapLocal = null; + } + } +}