http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java deleted file mode 100644 index f605b22..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.loading.sort.impl; - -import java.io.File; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.schema.BucketingInfo; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; -import org.apache.carbondata.processing.loading.DataField; -import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; -import org.apache.carbondata.processing.loading.row.CarbonRowBatch; -import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter; -import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; -import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows; -import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger; -import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger; -import org.apache.carbondata.processing.sort.sortdata.SortParameters; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; - -/** - * It parallely reads data from array of iterates and do merge sort. - * First it sorts the data and write to temp files. These temp files will be merge sorted to get - * final merge sort result. - * This step is specifically for bucketing, it sorts each bucket data separately and write to - * temp files. - */ -public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter { - - private static final LogService LOGGER = - LogServiceFactory.getLogService( - UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName()); - - private SortParameters sortParameters; - - private BucketingInfo bucketingInfo; - - public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields, - BucketingInfo bucketingInfo) { - this.bucketingInfo = bucketingInfo; - } - - @Override public void initialize(SortParameters sortParameters) { - this.sortParameters = sortParameters; - } - - @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) - throws CarbonDataLoadingException { - UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()]; - UnsafeIntermediateMerger[] intermediateFileMergers = - new UnsafeIntermediateMerger[sortDataRows.length]; - int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); - inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets(); - if (inMemoryChunkSizeInMB < 5) { - inMemoryChunkSizeInMB = 5; - } - try { - for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) { - SortParameters parameters = sortParameters.getCopy(); - parameters.setPartitionID(i + ""); - setTempLocation(parameters); - intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters); - sortDataRows[i] = - new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB); - sortDataRows[i].initialize(); - } - } catch (MemoryException e) { - throw new CarbonDataLoadingException(e); - } - ExecutorService executorService = Executors.newFixedThreadPool(iterators.length); - this.threadStatusObserver = new ThreadStatusObserver(executorService); - final int batchSize = CarbonProperties.getInstance().getBatchSize(); - try { - for (int i = 0; i < iterators.length; i++) { - executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this - .threadStatusObserver)); - } - executorService.shutdown(); - executorService.awaitTermination(2, TimeUnit.DAYS); - processRowToNextStep(sortDataRows, sortParameters); - } catch (Exception e) { - checkError(); - throw new CarbonDataLoadingException("Problem while shutdown the server ", e); - } - checkError(); - try { - for (int i = 0; i < intermediateFileMergers.length; i++) { - intermediateFileMergers[i].finish(); - } - } catch (Exception e) { - throw new CarbonDataLoadingException(e); - } - - Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()]; - for (int i = 0; i < sortDataRows.length; i++) { - batchIterator[i] = new MergedDataIterator(batchSize, intermediateFileMergers[i]); - } - - return batchIterator; - } - - private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger() { - String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation( - sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), - false, false); - // Set the data file location - String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, - File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); - return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation); - } - - @Override public void close() { - } - - /** - * Below method will be used to process data to next step - */ - private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters) - throws CarbonDataLoadingException { - if (null == sortDataRows || sortDataRows.length == 0) { - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - LOGGER.info("Number of Records was Zero"); - String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; - LOGGER.info(logMessage); - return false; - } - - try { - for (int i = 0; i < sortDataRows.length; i++) { - // start sorting - sortDataRows[i].startSorting(); - } - // check any more rows are present - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); - CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); - return false; - } catch (Exception e) { - throw new CarbonDataLoadingException(e); - } - } - - private void setTempLocation(SortParameters parameters) { - String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), - parameters.getTaskNo(), parameters.getSegmentId(), - false, false); - String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, - CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); - parameters.setTempFileLocation(tmpLoc); - } - - /** - * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows} - */ - private static class SortIteratorThread implements Runnable { - - private Iterator<CarbonRowBatch> iterator; - - private UnsafeSortDataRows[] sortDataRows; - - private ThreadStatusObserver threadStatusObserver; - - public SortIteratorThread(Iterator<CarbonRowBatch> iterator, - UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) { - this.iterator = iterator; - this.sortDataRows = sortDataRows; - this.threadStatusObserver = threadStatusObserver; - } - - @Override - public void run() { - try { - while (iterator.hasNext()) { - CarbonRowBatch batch = iterator.next(); - int i = 0; - while (batch.hasNext()) { - CarbonRow row = batch.next(); - if (row != null) { - UnsafeSortDataRows sortDataRow = sortDataRows[row.bucketNumber]; - synchronized (sortDataRow) { - sortDataRow.addRow(row.getData()); - } - } - } - } - } catch (Exception e) { - LOGGER.error(e); - this.threadStatusObserver.notifyFailed(e); - } - } - - } - - private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> { - - - private int batchSize; - - private boolean firstRow; - - private UnsafeIntermediateMerger intermediateMerger; - - public MergedDataIterator(int batchSize, - UnsafeIntermediateMerger intermediateMerger) { - this.batchSize = batchSize; - this.intermediateMerger = intermediateMerger; - this.firstRow = true; - } - - private UnsafeSingleThreadFinalSortFilesMerger finalMerger; - - @Override public boolean hasNext() { - if (firstRow) { - firstRow = false; - finalMerger = getFinalMerger(); - List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages(); - finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]), - intermediateMerger.getMergedPages()); - } - return finalMerger.hasNext(); - } - - @Override public CarbonRowBatch next() { - int counter = 0; - CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize); - while (finalMerger.hasNext() && counter < batchSize) { - rowBatch.addRow(new CarbonRow(finalMerger.next())); - counter++; - } - return rowBatch; - } - } -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java new file mode 100644 index 0000000..99d6627 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.sort.impl; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.schema.ColumnRangeInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.loading.row.CarbonRowBatch; +import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter; +import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; +import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows; +import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger; +import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger; +import org.apache.carbondata.processing.sort.sortdata.SortParameters; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.commons.lang3.StringUtils; + +/** + * It parallely reads data from array of iterates and do merge sort. + * First it sorts the data and write to temp files. These temp files will be merge sorted to get + * final merge sort result. + * This step is specifically for the data loading with specifying column value range, such as + * bucketing, sort_column_bounds, it sorts each range of data separately and write to temp files. + */ +public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSorter { + + private static final LogService LOGGER = + LogServiceFactory.getLogService( + UnsafeParallelReadMergeSorterWithColumnRangeImpl.class.getName()); + + private SortParameters originSortParameters; + private UnsafeIntermediateMerger[] intermediateFileMergers; + private int inMemoryChunkSizeInMB; + private AtomicLong rowCounter; + private ColumnRangeInfo columnRangeInfo; + /** + * counters to collect information about rows processed by each range + */ + private List<AtomicLong> insideRowCounterList; + + public UnsafeParallelReadMergeSorterWithColumnRangeImpl(AtomicLong rowCounter, + ColumnRangeInfo columnRangeInfo) { + this.rowCounter = rowCounter; + this.columnRangeInfo = columnRangeInfo; + } + + @Override public void initialize(SortParameters sortParameters) { + this.originSortParameters = sortParameters; + int totalInMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); + inMemoryChunkSizeInMB = totalInMemoryChunkSizeInMB / columnRangeInfo.getNumOfRanges(); + if (inMemoryChunkSizeInMB < 5) { + inMemoryChunkSizeInMB = 5; + } + this.insideRowCounterList = new ArrayList<>(columnRangeInfo.getNumOfRanges()); + for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) { + insideRowCounterList.add(new AtomicLong(0)); + } + } + + @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) + throws CarbonDataLoadingException { + UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[columnRangeInfo.getNumOfRanges()]; + intermediateFileMergers = new UnsafeIntermediateMerger[columnRangeInfo.getNumOfRanges()]; + SortParameters[] sortParameterArray = new SortParameters[columnRangeInfo.getNumOfRanges()]; + try { + for (int i = 0; i < columnRangeInfo.getNumOfRanges(); i++) { + SortParameters parameters = originSortParameters.getCopy(); + parameters.setPartitionID(i + ""); + parameters.setRangeId(i); + sortParameterArray[i] = parameters; + setTempLocation(parameters); + intermediateFileMergers[i] = new UnsafeIntermediateMerger(parameters); + sortDataRows[i] = + new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB); + sortDataRows[i].initialize(); + } + } catch (MemoryException e) { + throw new CarbonDataLoadingException(e); + } + ExecutorService executorService = Executors.newFixedThreadPool(iterators.length); + this.threadStatusObserver = new ThreadStatusObserver(executorService); + final int batchSize = CarbonProperties.getInstance().getBatchSize(); + try { + for (int i = 0; i < iterators.length; i++) { + executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter, + this.insideRowCounterList, this.threadStatusObserver)); + } + executorService.shutdown(); + executorService.awaitTermination(2, TimeUnit.DAYS); + processRowToNextStep(sortDataRows, originSortParameters); + } catch (Exception e) { + checkError(); + throw new CarbonDataLoadingException("Problem while shutdown the server ", e); + } + checkError(); + try { + for (int i = 0; i < intermediateFileMergers.length; i++) { + intermediateFileMergers[i].finish(); + } + } catch (Exception e) { + throw new CarbonDataLoadingException(e); + } + + Iterator<CarbonRowBatch>[] batchIterator = new Iterator[columnRangeInfo.getNumOfRanges()]; + for (int i = 0; i < sortDataRows.length; i++) { + batchIterator[i] = + new MergedDataIterator(sortParameterArray[i], batchSize, intermediateFileMergers[i]); + } + + return batchIterator; + } + + private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) { + String[] storeLocation = CarbonDataProcessorUtil + .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), + String.valueOf(sortParameters.getTaskNo()), + sortParameters.getSegmentId() + "", false, false); + // Set the data file location + String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, + File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + return new UnsafeSingleThreadFinalSortFilesMerger(sortParameters, dataFolderLocation); + } + + @Override public void close() { + for (int i = 0; i < intermediateFileMergers.length; i++) { + intermediateFileMergers[i].close(); + } + } + + /** + * Below method will be used to process data to next step + */ + private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters) + throws CarbonDataLoadingException { + if (null == sortDataRows || sortDataRows.length == 0) { + LOGGER.info("Record Processed For table: " + parameters.getTableName()); + LOGGER.info("Number of Records was Zero"); + String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; + LOGGER.info(logMessage); + return false; + } + + try { + for (int i = 0; i < sortDataRows.length; i++) { + // start sorting + sortDataRows[i].startSorting(); + } + // check any more rows are present + LOGGER.info("Record Processed For table: " + parameters.getTableName()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); + return false; + } catch (Exception e) { + throw new CarbonDataLoadingException(e); + } + } + + private void setTempLocation(SortParameters parameters) { + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil + .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), + parameters.getTaskNo(), parameters.getSegmentId(), + false, false); + String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + LOGGER.error("set temp location: " + StringUtils.join(tmpLoc, ", ")); + parameters.setTempFileLocation(tmpLoc); + } + + /** + * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows} + */ + private static class SortIteratorThread implements Runnable { + + private Iterator<CarbonRowBatch> iterator; + + private UnsafeSortDataRows[] sortDataRows; + private AtomicLong rowCounter; + private List<AtomicLong> insideRowCounterList; + private ThreadStatusObserver threadStatusObserver; + + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, + UnsafeSortDataRows[] sortDataRows, AtomicLong rowCounter, + List<AtomicLong> insideRowCounterList, + ThreadStatusObserver threadStatusObserver) { + this.iterator = iterator; + this.sortDataRows = sortDataRows; + this.rowCounter = rowCounter; + this.insideRowCounterList = insideRowCounterList; + this.threadStatusObserver = threadStatusObserver; + } + + @Override + public void run() { + try { + while (iterator.hasNext()) { + CarbonRowBatch batch = iterator.next(); + int i = 0; + while (batch.hasNext()) { + CarbonRow row = batch.next(); + if (row != null) { + UnsafeSortDataRows sortDataRow = sortDataRows[row.getRangeId()]; + synchronized (sortDataRow) { + rowCounter.getAndIncrement(); + insideRowCounterList.get(row.getRangeId()).getAndIncrement(); + sortDataRow.addRow(row.getData()); + } + } + } + } + LOGGER.info("Rows processed by each range: " + insideRowCounterList); + } catch (Exception e) { + LOGGER.error(e); + this.threadStatusObserver.notifyFailed(e); + } + } + } + + private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> { + + private SortParameters sortParameters; + + private int batchSize; + + private boolean firstRow; + + private UnsafeIntermediateMerger intermediateMerger; + + public MergedDataIterator(SortParameters sortParameters, int batchSize, + UnsafeIntermediateMerger intermediateMerger) { + this.sortParameters = sortParameters; + this.batchSize = batchSize; + this.intermediateMerger = intermediateMerger; + this.firstRow = true; + } + + private UnsafeSingleThreadFinalSortFilesMerger finalMerger; + + @Override public boolean hasNext() { + if (firstRow) { + firstRow = false; + finalMerger = getFinalMerger(sortParameters); + List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages(); + finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]), + intermediateMerger.getMergedPages()); + } + return finalMerger.hasNext(); + } + + @Override public CarbonRowBatch next() { + int counter = 0; + CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize); + while (finalMerger.hasNext() && counter < batchSize) { + rowBatch.addRow(new CarbonRow(finalMerger.next())); + counter++; + } + return rowBatch; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java index 5d038d3..eaa858e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java @@ -366,9 +366,9 @@ public class UnsafeSortDataRows { // create a new file and pick a temp directory randomly every time String tmpDir = parameters.getTempFileLocation()[ new Random().nextInt(parameters.getTempFileLocation().length)]; - File sortTempFile = new File( - tmpDir + File.separator + parameters.getTableName() - + System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + File sortTempFile = new File(tmpDir + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataToFile(page, sortTempFile); LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize() + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:" http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java index 0d24e01..104f3f5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java @@ -112,9 +112,9 @@ public class UnsafeIntermediateMerger { String[] tempFileLocations = parameters.getTempFileLocation(); String targetLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)]; - File file = new File( - targetLocation + File.separator + parameters.getTableName() + System - .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); + File file = new File(targetLocation + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.MERGERD_EXTENSION); UnsafeIntermediateFileMerger merger = new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file); mergerTask.add(executorService.submit(merger)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index 64f3c25..b1dc156 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -147,19 +147,20 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec } private List<File> getFilesToMergeSort() { + // this can be partitionId, bucketId or rangeId, let's call it rangeId + final int rangeId = parameters.getRangeId(); + FileFilter fileFilter = new FileFilter() { public boolean accept(File pathname) { - return pathname.getName().startsWith(tableName); + return pathname.getName().startsWith(tableName + '_' + rangeId); } }; // get all the merged files List<File> files = new ArrayList<File>(tempFileLocation.length); - for (String tempLoc : tempFileLocation) - { + for (String tempLoc : tempFileLocation) { File[] subFiles = new File(tempLoc).listFiles(fileFilter); - if (null != subFiles && subFiles.length > 0) - { + if (null != subFiles && subFiles.length > 0) { files.addAll(Arrays.asList(subFiles)); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java index 90a340d..72a8c25 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java @@ -19,21 +19,33 @@ package org.apache.carbondata.processing.loading.steps; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.schema.BucketingInfo; +import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.BadRecordsLogger; import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; +import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; +import org.apache.carbondata.processing.loading.converter.FieldConverter; import org.apache.carbondata.processing.loading.converter.RowConverter; import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.partition.Partitioner; +import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl; +import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerImpl; +import org.apache.carbondata.processing.loading.partition.impl.RawRowComparator; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; import org.apache.carbondata.processing.util.CarbonBadRecordUtil; +import org.apache.commons.lang3.StringUtils; + /** * Replace row data fields with dictionary values if column is configured dictionary encoded. * And nondictionary columns as well as complex columns will be converted to byte[]. @@ -41,7 +53,10 @@ import org.apache.carbondata.processing.util.CarbonBadRecordUtil; public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep { private List<RowConverter> converters; + private Partitioner<CarbonRow> partitioner; private BadRecordsLogger badRecordLogger; + private boolean isSortColumnRangeEnabled = false; + private boolean isBucketColumnEnabled = false; public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration, AbstractDataLoadProcessorStep child) { @@ -64,6 +79,81 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte configuration.setCardinalityFinder(converter); converters.add(converter); converter.initialize(); + + if (null != configuration.getBucketingInfo()) { + this.isBucketColumnEnabled = true; + initializeBucketColumnPartitioner(); + } else if (null != configuration.getSortColumnRangeInfo()) { + this.isSortColumnRangeEnabled = true; + initializeSortColumnRangesPartitioner(); + } + } + + /** + * initialize partitioner for bucket column + */ + private void initializeBucketColumnPartitioner() { + List<Integer> indexes = new ArrayList<>(); + List<ColumnSchema> columnSchemas = new ArrayList<>(); + DataField[] inputDataFields = getOutput(); + BucketingInfo bucketingInfo = configuration.getBucketingInfo(); + for (int i = 0; i < inputDataFields.length; i++) { + for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) { + if (inputDataFields[i].getColumn().getColName() + .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) { + indexes.add(i); + columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema()); + break; + } + } + } + + // hash partitioner to dispatch rows by bucket column + this.partitioner = + new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumOfRanges()); + } + + + /** + * initialize partitioner for sort column ranges + */ + private void initializeSortColumnRangesPartitioner() { + // convert user specified sort-column ranges + SortColumnRangeInfo sortColumnRangeInfo = configuration.getSortColumnRangeInfo(); + int rangeValueCnt = sortColumnRangeInfo.getUserSpecifiedRanges().length; + CarbonRow[] convertedSortColumnRanges = new CarbonRow[rangeValueCnt]; + for (int i = 0; i < rangeValueCnt; i++) { + Object[] fakeOriginRow = new Object[configuration.getDataFields().length]; + String[] oneBound = StringUtils.splitPreserveAllTokens( + sortColumnRangeInfo.getUserSpecifiedRanges()[i], sortColumnRangeInfo.getSeparator(), -1); + // set the corresponding sort column + int j = 0; + for (int colIdx : sortColumnRangeInfo.getSortColumnIndex()) { + fakeOriginRow[colIdx] = oneBound[j++]; + } + CarbonRow fakeCarbonRow = new CarbonRow(fakeOriginRow); + convertFakeRow(fakeCarbonRow, sortColumnRangeInfo); + convertedSortColumnRanges[i] = fakeCarbonRow; + } + // sort the range bounds (sort in carbon is a little different from what we think) + Arrays.sort(convertedSortColumnRanges, + new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(), + sortColumnRangeInfo.getIsSortColumnNoDict())); + + // range partitioner to dispatch rows by sort columns + this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges, + new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(), + sortColumnRangeInfo.getIsSortColumnNoDict())); + } + + // only convert sort column fields + private void convertFakeRow(CarbonRow fakeRow, SortColumnRangeInfo sortColumnRangeInfo) { + FieldConverter[] fieldConverters = converters.get(0).getFieldConverters(); + BadRecordLogHolder logHolder = new BadRecordLogHolder(); + logHolder.setLogged(false); + for (int colIdx : sortColumnRangeInfo.getSortColumnIndex()) { + fieldConverters[colIdx].convert(fakeRow, logHolder); + } } /** @@ -102,6 +192,10 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) { while (rowBatch.hasNext()) { CarbonRow convertRow = localConverter.convert(rowBatch.next()); + if (isSortColumnRangeEnabled || isBucketColumnEnabled) { + short rangeNumber = (short) partitioner.getPartition(convertRow); + convertRow.setRangeId(rangeNumber); + } rowBatch.setPreviousRow(convertRow); } rowCounter.getAndAdd(rowBatch.getSize()); @@ -134,6 +228,12 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte } @Override protected String getStepName() { - return "Data Converter"; + if (isBucketColumnEnabled) { + return "Data Converter with Bucketing"; + } else if (isSortColumnRangeEnabled) { + return "Data Converter with sort column range"; + } else { + return "Data Converter"; + } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java deleted file mode 100644 index a1181c9..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.loading.steps; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.schema.BucketingInfo; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; -import org.apache.carbondata.processing.loading.BadRecordsLogger; -import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider; -import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.loading.DataField; -import org.apache.carbondata.processing.loading.converter.RowConverter; -import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; -import org.apache.carbondata.processing.loading.partition.Partitioner; -import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl; -import org.apache.carbondata.processing.loading.row.CarbonRowBatch; -import org.apache.carbondata.processing.util.CarbonBadRecordUtil; - -/** - * Replace row data fields with dictionary values if column is configured dictionary encoded. - * And nondictionary columns as well as complex columns will be converted to byte[]. - */ -public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep { - - private List<RowConverter> converters; - - private Partitioner<Object[]> partitioner; - - private BadRecordsLogger badRecordLogger; - - public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration, - AbstractDataLoadProcessorStep child) { - super(configuration, child); - } - - @Override - public DataField[] getOutput() { - return child.getOutput(); - } - - @Override - public void initialize() throws IOException { - super.initialize(); - child.initialize(); - converters = new ArrayList<>(); - badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration); - RowConverter converter = - new RowConverterImpl(child.getOutput(), configuration, badRecordLogger); - configuration.setCardinalityFinder(converter); - converters.add(converter); - converter.initialize(); - List<Integer> indexes = new ArrayList<>(); - List<ColumnSchema> columnSchemas = new ArrayList<>(); - DataField[] inputDataFields = getOutput(); - BucketingInfo bucketingInfo = configuration.getBucketingInfo(); - for (int i = 0; i < inputDataFields.length; i++) { - for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) { - if (inputDataFields[i].getColumn().getColName() - .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) { - indexes.add(i); - columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema()); - break; - } - } - } - partitioner = - new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumberOfBuckets()); - } - - /** - * Create the iterator using child iterator. - * - * @param childIter - * @return new iterator with step specific processing. - */ - @Override - protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) { - return new CarbonIterator<CarbonRowBatch>() { - RowConverter localConverter; - private boolean first = true; - @Override public boolean hasNext() { - if (first) { - first = false; - localConverter = converters.get(0).createCopyForNewThread(); - converters.add(localConverter); - } - return childIter.hasNext(); - } - - @Override public CarbonRowBatch next() { - return processRowBatch(childIter.next(), localConverter); - } - }; - } - - /** - * Process the batch of rows as per the step logic. - * - * @param rowBatch - * @return processed row. - */ - protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) { - while (rowBatch.hasNext()) { - CarbonRow row = rowBatch.next(); - short bucketNumber = (short) partitioner.getPartition(row.getData()); - CarbonRow convertRow = localConverter.convert(row); - convertRow.bucketNumber = bucketNumber; - rowBatch.setPreviousRow(convertRow); - } - rowCounter.getAndAdd(rowBatch.getSize()); - // reuse the origin batch - rowBatch.rewind(); - return rowBatch; - } - - @Override - protected CarbonRow processRow(CarbonRow row) { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - if (!closed) { - super.close(); - if (null != badRecordLogger) { - badRecordLogger.closeStreams(); - CarbonBadRecordUtil.renameBadRecord(configuration); - } - if (converters != null) { - for (RowConverter converter : converters) { - converter.finish(); - } - } - } - } - @Override protected String getStepName() { - return "Data Converter with Bucketing"; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index 58009af..0467b11 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -17,7 +17,15 @@ package org.apache.carbondata.processing.loading.steps; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -25,6 +33,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; @@ -92,7 +101,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); + ExecutorService rangeExecutorService = Executors.newFixedThreadPool(iterators.length, + new CarbonThreadFactory("WriterForwardPool: " + tableName)); + List<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length); int i = 0; + // do this concurrently for (Iterator<CarbonRowBatch> iterator : iterators) { String[] storeLocation = getStoreLocation(tableIdentifier); @@ -112,9 +125,19 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { if (!rowsNotExist) { finish(dataHandler); } + rangeExecutorServiceSubmitList.add( + rangeExecutorService.submit(new WriterForwarder(iterator, tableIdentifier, i))); i++; } - + try { + rangeExecutorService.shutdown(); + rangeExecutorService.awaitTermination(2, TimeUnit.DAYS); + for (int j = 0; j < rangeExecutorServiceSubmitList.size(); j++) { + rangeExecutorServiceSubmitList.get(j).get(); + } + } catch (InterruptedException | ExecutionException e) { + throw new CarbonDataWriterException(e); + } } catch (CarbonDataWriterException e) { LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl"); throw new CarbonDataLoadingException( @@ -130,6 +153,51 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { return "Data Writer"; } + /** + * Used to forward rows to different ranges based on range id. + */ + private final class WriterForwarder implements Callable<Void> { + private Iterator<CarbonRowBatch> insideRangeIterator; + private CarbonTableIdentifier tableIdentifier; + private int rangeId; + + public WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator, + CarbonTableIdentifier tableIdentifier, int rangeId) { + this.insideRangeIterator = insideRangeIterator; + this.tableIdentifier = tableIdentifier; + this.rangeId = rangeId; + } + + @Override public Void call() throws Exception { + LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName() + + ", range: " + rangeId); + processRange(insideRangeIterator, tableIdentifier, rangeId); + return null; + } + } + + private void processRange(Iterator<CarbonRowBatch> insideRangeIterator, + CarbonTableIdentifier tableIdentifier, int rangeId) { + String[] storeLocation = getStoreLocation(tableIdentifier); + + CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel + .createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0); + CarbonFactHandler dataHandler = null; + boolean rowsNotExist = true; + while (insideRangeIterator.hasNext()) { + if (rowsNotExist) { + rowsNotExist = false; + dataHandler = CarbonFactHandlerFactory + .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + dataHandler.initialise(); + } + processBatch(insideRangeIterator.next(), dataHandler); + } + if (!rowsNotExist) { + finish(dataHandler); + } + } + public void finish(CarbonFactHandler dataHandler) { CarbonTableIdentifier tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java index a4ac0ea..1a839a2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java @@ -119,9 +119,10 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { } private List<File> getFilesToMergeSort() { + final int rangeId = sortParameters.getRangeId(); FileFilter fileFilter = new FileFilter() { public boolean accept(File pathname) { - return pathname.getName().startsWith(tableName); + return pathname.getName().startsWith(tableName + '_' + rangeId); } }; http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java index c7efbd9..a5caf7b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java @@ -212,9 +212,9 @@ public class SortDataRows { // create new file and choose folder randomly String[] tmpLocation = parameters.getTempFileLocation(); String locationChosen = tmpLocation[new Random().nextInt(tmpLocation.length)]; - File file = new File( - locationChosen + File.separator + parameters.getTableName() + - System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + File file = new File(locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataToFile(recordHolderList, this.entryCount, file); } @@ -325,8 +325,9 @@ public class SortDataRows { String[] tmpFileLocation = parameters.getTempFileLocation(); String locationChosen = tmpFileLocation[new Random().nextInt(tmpFileLocation.length)]; File sortTempFile = new File( - locationChosen + File.separator + parameters.getTableName() + System - .nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT); + locationChosen + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.SORT_TEMP_FILE_EXT); writeDataToFile(recordHolderArray, recordHolderArray.length, sortTempFile); // add sort temp filename to and arrayList. When the list size reaches 20 then // intermediate merging of sort temp files will be triggered http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java index 9c995a5..0e3f6bd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortIntermediateFileMerger.java @@ -94,9 +94,9 @@ public class SortIntermediateFileMerger { private void startIntermediateMerging(File[] intermediateFiles) { int index = new Random().nextInt(parameters.getTempFileLocation().length); String chosenTempDir = parameters.getTempFileLocation()[index]; - File file = new File( - chosenTempDir + File.separator + parameters.getTableName() + System - .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); + File file = new File(chosenTempDir + File.separator + parameters.getTableName() + + '_' + parameters.getRangeId() + '_' + System.nanoTime() + + CarbonCommonConstants.MERGERD_EXTENSION); IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file); mergerTask.add(executorService.submit(merger)); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java index 4d31f87..4d333ed 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java @@ -119,6 +119,7 @@ public class SortParameters implements Serializable { private int numberOfCores; private int batchSortSizeinMb; + private int rangeId = 0; public SortParameters getCopy() { SortParameters parameters = new SortParameters(); @@ -147,6 +148,7 @@ public class SortParameters implements Serializable { parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns; parameters.numberOfCores = numberOfCores; parameters.batchSortSizeinMb = batchSortSizeinMb; + parameters.rangeId = rangeId; return parameters; } @@ -429,6 +431,14 @@ public class SortParameters implements Serializable { return parameters; } + public int getRangeId() { + return rangeId; + } + + public void setRangeId(int rangeId) { + this.rangeId = rangeId; + } + /** * this method will set the boolean mapping for no dictionary sort columns * http://git-wip-us.apache.org/repos/asf/carbondata/blob/2414fb0c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index b795696..c0acadd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -167,12 +167,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { blockletProcessingCount = new AtomicInteger(0); producerExecutorService = Executors.newFixedThreadPool(numberOfCores, - new CarbonThreadFactory("ProducerPool:" + model.getTableName())); + new CarbonThreadFactory("ProducerPool:" + model.getTableName() + + ", range: " + model.getBucketId())); producerExecutorServiceTaskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); LOGGER.info("Initializing writer executors"); consumerExecutorService = Executors - .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName())); + .newFixedThreadPool(1, new CarbonThreadFactory("ConsumerPool:" + model.getTableName() + + ", range: " + model.getBucketId())); consumerExecutorServiceTaskList = new ArrayList<>(1); semaphore = new Semaphore(numberOfCores); tablePageList = new TablePageList();