Repository: incubator-carbondata Updated Branches: refs/heads/master f47bbc2c2 -> e7e370cac
Optimize data loading Handled broadcast fails. Updated as per comments Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/63434fac Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/63434fac Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/63434fac Branch: refs/heads/master Commit: 63434fac5f4dc2d7eb9d03819401e243744a5f48 Parents: f47bbc2 Author: ravipesala <ravi.pes...@gmail.com> Authored: Sun Nov 27 17:39:36 2016 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Fri Dec 2 15:52:02 2016 +0800 ---------------------------------------------------------------------- .../carbondata/common/CarbonIterator.java | 14 +++ .../AbstractDetailQueryResultIterator.java | 2 +- .../carbondata/hadoop/csv/CSVInputFormat.java | 42 +++++--- .../recorditerator/RecordReaderIterator.java | 31 +++++- .../spark/rdd/NewCarbonDataLoadRDD.scala | 40 ++++--- .../processing/iterator/CarbonIterator.java | 38 ------- .../processing/newflow/DataLoadExecutor.java | 5 +- .../newflow/DataLoadProcessBuilder.java | 8 +- .../sort/impl/ParallelReadMergeSorterImpl.java | 90 ++++++++-------- .../newflow/steps/InputProcessorStepImpl.java | 105 ++++++++++++++----- .../sortandgroupby/sortdata/SortDataRows.java | 40 +++++++ 11 files changed, 268 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java b/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java index 9141bcd..b1a5b5a 100644 --- a/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java +++ b/common/src/main/java/org/apache/carbondata/common/CarbonIterator.java @@ -35,4 +35,18 @@ public abstract class CarbonIterator<E> implements Iterator<E> { throw new UnsupportedOperationException("remove"); } + /** + * Initialize the iterator + */ + public void initialize() { + // sub classes can overwrite to provide initialize logic to this method + } + + /** + * Close the resources + */ + public void close() { + // sub classes can overwrite to provide close logic to this method + } + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java index c8c61b0..07ccab4 100644 --- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -114,7 +114,7 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize()); DataRefNode startDataBlock = finder .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey()); - while (startDataBlock.nodeNumber() != blockInfo.getStartBlockletIndex()) { + while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) { startDataBlock = startDataBlock.getNextDataRefNode(); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java index 3ea96ac..ca27673 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java @@ -66,6 +66,8 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri public static final String ESCAPE_DEFAULT = "\\"; public static final String HEADER_PRESENT = "caron.csvinputformat.header.present"; public static final boolean HEADER_PRESENT_DEFAULT = false; + public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size"; + public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; @Override public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit, @@ -85,10 +87,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri /** * Sets the comment char to configuration. Default it is #. - * @param commentChar * @param configuration + * @param commentChar */ - public static void setCommentCharacter(String commentChar, Configuration configuration) { + public static void setCommentCharacter(Configuration configuration, String commentChar) { if (commentChar != null && !commentChar.isEmpty()) { configuration.set(COMMENT, commentChar); } @@ -96,10 +98,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri /** * Sets the delimiter to configuration. Default it is ',' - * @param delimiter * @param configuration + * @param delimiter */ - public static void setCSVDelimiter(String delimiter, Configuration configuration) { + public static void setCSVDelimiter(Configuration configuration, String delimiter) { if (delimiter != null && !delimiter.isEmpty()) { configuration.set(DELIMITER, delimiter); } @@ -107,10 +109,10 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri /** * Sets the escape character to configuration. Default it is \ - * @param escapeCharacter * @param configuration + * @param escapeCharacter */ - public static void setEscapeCharacter(String escapeCharacter, Configuration configuration) { + public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) { if (escapeCharacter != null && !escapeCharacter.isEmpty()) { configuration.set(ESCAPE, escapeCharacter); } @@ -118,26 +120,37 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri /** * Whether header needs to read from csv or not. By default it is false. - * @param headerExtractEnable * @param configuration + * @param headerExtractEnable */ - public static void setHeaderExtractionEnabled(boolean headerExtractEnable, - Configuration configuration) { + public static void setHeaderExtractionEnabled(Configuration configuration, + boolean headerExtractEnable) { configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable)); } /** * Sets the quote character to configuration. Default it is " - * @param quoteCharacter * @param configuration + * @param quoteCharacter */ - public static void setQuoteCharacter(String quoteCharacter, Configuration configuration) { + public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) { if (quoteCharacter != null && !quoteCharacter.isEmpty()) { configuration.set(QUOTE, quoteCharacter); } } /** + * Sets the read buffer size to configuration. + * @param configuration + * @param bufferSize + */ + public static void setReadBufferSize(Configuration configuration, String bufferSize) { + if (bufferSize != null && !bufferSize.isEmpty()) { + configuration.set(READ_BUFFER_SIZE, bufferSize); + } + } + + /** * Treats value as line in file. Key is null. */ public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> { @@ -163,8 +176,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri Configuration job = context.getConfiguration(); CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file); FileSystem fs = file.getFileSystem(job); - FSDataInputStream fileIn = fs.open(file); - InputStream inputStream = null; + int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT)); + FSDataInputStream fileIn = fs.open(file, bufferSize); + InputStream inputStream; if (codec != null) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); @@ -209,6 +223,8 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri parserSettings.setIgnoreLeadingWhitespaces(false); parserSettings.setIgnoreTrailingWhitespaces(false); parserSettings.setSkipEmptyLines(false); + // TODO get from csv file. + parserSettings.setMaxColumns(1000); parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0)); parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0)); if (start == 0) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java index 478af0a..a1bc1e9 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java @@ -16,12 +16,16 @@ */ package org.apache.carbondata.hadoop.csv.recorditerator; +import java.io.IOException; + import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.hadoop.io.StringArrayWritable; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * It is wrapper iterator around @{@link RecordReader}. @@ -38,8 +42,15 @@ public class RecordReaderIterator extends CarbonIterator<Object []> { */ private boolean isConsumed; - public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader) { + private InputSplit split; + + private TaskAttemptContext context; + + public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader, + InputSplit split, TaskAttemptContext context) { this.recordReader = recordReader; + this.split = split; + this.context = context; } @Override @@ -65,4 +76,22 @@ public class RecordReaderIterator extends CarbonIterator<Object []> { throw new CarbonDataLoadingException(e); } } + + @Override + public void initialize() { + try { + recordReader.initialize(split, context); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + try { + recordReader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 67d1ce0..44a2416 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -29,18 +29,18 @@ import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableCon import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.Partitioner +import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.common.logging.impl.StandardLogService import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} import org.apache.carbondata.hadoop.csv.CSVInputFormat import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.newflow.DataLoadExecutor import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException import org.apache.carbondata.spark.DataLoadResult -import org.apache.carbondata.spark.load._ import org.apache.carbondata.spark.splits.TableSplit import org.apache.carbondata.spark.util.CarbonQueryUtil @@ -133,9 +133,13 @@ class NewCarbonDataLoadRDD[K, V]( throw e } - def getInputIterators: Array[util.Iterator[Array[AnyRef]]] = { + def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = { val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index, 0) - val configuration: Configuration = confBroadcast.value.value + var configuration: Configuration = confBroadcast.value.value + // Broadcast fails in some cases WTF?? + if (configuration == null) { + configuration = new Configuration() + } configureCSVInputFormat(configuration) val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId) val format = new CSVInputFormat @@ -160,10 +164,11 @@ class NewCarbonDataLoadRDD[K, V]( partitionID, split.partitionBlocksDetail.length) val readers = split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext)) - readers.zipWithIndex.foreach { case (reader, index) => - reader.initialize(split.partitionBlocksDetail(index), hadoopAttemptContext) + readers.zipWithIndex.map { case (reader, index) => + new RecordReaderIterator(reader, + split.partitionBlocksDetail(index), + hadoopAttemptContext) } - readers.map(new RecordReaderIterator(_)) } else { // for node partition val split = theSplit.asInstanceOf[CarbonNodePartition] @@ -185,21 +190,22 @@ class NewCarbonDataLoadRDD[K, V]( StandardLogService.setThreadName(blocksID, null) val readers = split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext)) - readers.zipWithIndex.foreach { case (reader, index) => - reader.initialize(split.nodeBlocksDetail(index), hadoopAttemptContext) + readers.zipWithIndex.map { case (reader, index) => + new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext) } - readers.map(new RecordReaderIterator(_)) } } def configureCSVInputFormat(configuration: Configuration): Unit = { - CSVInputFormat.setCommentCharacter(carbonLoadModel.getCommentChar, configuration) - CSVInputFormat.setCSVDelimiter(carbonLoadModel.getCsvDelimiter, configuration) - CSVInputFormat.setEscapeCharacter(carbonLoadModel.getEscapeChar, configuration) - CSVInputFormat.setHeaderExtractionEnabled( - carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty, - configuration) - CSVInputFormat.setQuoteCharacter(carbonLoadModel.getQuoteChar, configuration) + CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar) + CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter) + CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar) + CSVInputFormat.setHeaderExtractionEnabled(configuration, + carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty) + CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar) + CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)) } /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java b/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java deleted file mode 100644 index 35bba6f..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/iterator/CarbonIterator.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.processing.iterator; - -public interface CarbonIterator<E> { - /** - * Returns <tt>true</tt> if the iteration has more elements. (In other - * words, returns <tt>true</tt> if <tt>next</tt> would return an element - * rather than throwing an exception.) - * - * @return <tt>true</tt> if the iterator has more elements. - */ - boolean hasNext(); - - /** - * Returns the next element in the iteration. - * - * @return the next element in the iteration. - */ - E next(); -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java index 746e0f2..fc24aa8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java @@ -16,8 +16,7 @@ */ package org.apache.carbondata.processing.newflow; -import java.util.Iterator; - +import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; @@ -35,7 +34,7 @@ public class DataLoadExecutor { LogServiceFactory.getLogService(DataLoadExecutor.class.getName()); public void execute(CarbonLoadModel loadModel, String storeLocation, - Iterator<Object[]>[] inputIterators) throws Exception { + CarbonIterator<Object[]>[] inputIterators) throws Exception { AbstractDataLoadProcessorStep loadProcessorStep = null; try { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java index 92c677c..a5388d9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -18,13 +18,14 @@ package org.apache.carbondata.processing.newflow; import java.io.File; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.CarbonMetadata; import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; @@ -50,7 +51,7 @@ public final class DataLoadProcessBuilder { LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName()); public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation, - Iterator[] inputIterators) throws Exception { + CarbonIterator[] inputIterators) throws Exception { CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); // 1. Reads the data input iterators and parses the data. @@ -133,6 +134,9 @@ public final class DataLoadProcessBuilder { loadModel.getBadRecordsAction().split(",")[1]); configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH, loadModel.getFactFilePath()); + if(CarbonMetadata.getInstance().getCarbonTable(carbonTable.getTableUniqueName()) == null) { + CarbonMetadata.getInstance().addCarbonTable(carbonTable); + } List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); List<CarbonMeasure> measures = http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java index cd487ec..e2e995c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java @@ -29,6 +29,7 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.processing.newflow.DataField; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; @@ -90,25 +91,23 @@ public class ParallelReadMergeSorterImpl implements Sorter { @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) throws CarbonDataLoadingException { - SortDataRows[] sortDataRows = new SortDataRows[iterators.length]; + SortDataRows sortDataRow = new SortDataRows(sortParameters, intermediateFileMerger); + final int batchSize = CarbonProperties.getInstance().getBatchSize(); try { - for (int i = 0; i < iterators.length; i++) { - sortDataRows[i] = new SortDataRows(sortParameters, intermediateFileMerger); - // initialize sort - sortDataRows[i].initialize(); - } + sortDataRow.initialize(); } catch (CarbonSortKeyAndGroupByException e) { throw new CarbonDataLoadingException(e); } this.executorService = Executors.newFixedThreadPool(iterators.length); try { - for (int i = 0; i < sortDataRows.length; i++) { + for (int i = 0; i < iterators.length; i++) { executorService.submit( - new SortIteratorThread(iterators[i], sortDataRows[i], sortParameters)); + new SortIteratorThread(iterators[i], sortDataRow, sortParameters, batchSize)); } executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.DAYS); + processRowToNextStep(sortDataRow, sortParameters); } catch (Exception e) { throw new CarbonDataLoadingException("Problem while shutdown the server ", e); } @@ -121,9 +120,6 @@ public class ParallelReadMergeSorterImpl implements Sorter { throw new CarbonDataLoadingException(e); } - //TODO get the batch size from CarbonProperties - final int batchSize = 1000; - // Creates the iterator to read from merge sorter. Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() { @@ -151,6 +147,36 @@ public class ParallelReadMergeSorterImpl implements Sorter { } /** + * Below method will be used to process data to next step + */ + private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) + throws CarbonDataLoadingException { + if (null == sortDataRows) { + LOGGER.info("Record Processed For table: " + parameters.getTableName()); + LOGGER.info("Number of Records was Zero"); + String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; + LOGGER.info(logMessage); + return false; + } + + try { + // start sorting + sortDataRows.startSorting(); + + // check any more rows are present + LOGGER.info("Record Processed For table: " + parameters.getTableName()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordDictionaryValuesTotalTime(parameters.getPartitionID(), + System.currentTimeMillis()); + return false; + } catch (CarbonSortKeyAndGroupByException e) { + throw new CarbonDataLoadingException(e); + } + } + + /** * This thread iterates the iterator and adds the rows to @{@link SortDataRows} */ private static class SortIteratorThread implements Callable<Void> { @@ -161,11 +187,14 @@ public class ParallelReadMergeSorterImpl implements Sorter { private SortParameters parameters; + private Object[][] buffer; + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows, - SortParameters parameters) { + SortParameters parameters, int batchSize) { this.iterator = iterator; this.sortDataRows = sortDataRows; this.parameters = parameters; + this.buffer = new Object[batchSize][]; } @Override @@ -174,15 +203,17 @@ public class ParallelReadMergeSorterImpl implements Sorter { while (iterator.hasNext()) { CarbonRowBatch batch = iterator.next(); Iterator<CarbonRow> batchIterator = batch.getBatchIterator(); + int i = 0; while (batchIterator.hasNext()) { CarbonRow row = batchIterator.next(); if (row != null) { - sortDataRows.addRow(row.getData()); + buffer[i++] = row.getData(); } } + if (i > 0) { + sortDataRows.addRowBatch(buffer, i); + } } - - processRowToNextStep(sortDataRows); } catch (Exception e) { LOGGER.error(e); throw new CarbonDataLoadingException(e); @@ -190,34 +221,5 @@ public class ParallelReadMergeSorterImpl implements Sorter { return null; } - /** - * Below method will be used to process data to next step - */ - private boolean processRowToNextStep(SortDataRows sortDataRows) - throws CarbonDataLoadingException { - if (null == sortDataRows) { - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - LOGGER.info("Number of Records was Zero"); - String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; - LOGGER.info(logMessage); - return false; - } - - try { - // start sorting - sortDataRows.startSorting(); - - // check any more rows are present - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); - CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValuesTotalTime(parameters.getPartitionID(), - System.currentTimeMillis()); - return false; - } catch (CarbonSortKeyAndGroupByException e) { - throw new CarbonDataLoadingException(e); - } - } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java index 69bd84a..b979af6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java @@ -3,6 +3,11 @@ package org.apache.carbondata.processing.newflow.steps; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; @@ -27,33 +32,35 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { private RowParser rowParser; - private Iterator<Object[]>[] inputIterators; + private CarbonIterator<Object[]>[] inputIterators; + + /** + * executor service to execute the query + */ + public ExecutorService executorService; public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration, - Iterator<Object[]>[] inputIterators) { + CarbonIterator<Object[]>[] inputIterators) { super(configuration, null); this.inputIterators = inputIterators; } - @Override - public DataField[] getOutput() { + @Override public DataField[] getOutput() { return configuration.getDataFields(); } - @Override - public void initialize() throws CarbonDataLoadingException { + @Override public void initialize() throws CarbonDataLoadingException { rowParser = new RowParserImpl(getOutput(), configuration); + executorService = Executors.newCachedThreadPool(); } - - - @Override - public Iterator<CarbonRowBatch>[] execute() { + @Override public Iterator<CarbonRowBatch>[] execute() { int batchSize = CarbonProperties.getInstance().getBatchSize(); - List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); + List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators(); Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length]; for (int i = 0; i < outIterators.length; i++) { - outIterators[i] = new InputProcessorIterator(readerIterators[i], rowParser, batchSize); + outIterators[i] = + new InputProcessorIterator(readerIterators[i], rowParser, batchSize, executorService); } return outIterators; } @@ -62,14 +69,14 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { * Partition input iterators equally as per the number of threads. * @return */ - private List<Iterator<Object[]>>[] partitionInputReaderIterators() { + private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() { // Get the number of cores configured in property. int numberOfCores = CarbonProperties.getInstance().getNumberOfCores(); // Get the minimum of number of cores and iterators size to get the number of parallel threads // to be launched. int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores); - List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber]; + List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber]; for (int i = 0; i < parallelThreadNumber; i++) { iterators[i] = new ArrayList<>(); } @@ -80,20 +87,26 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { return iterators; } - @Override - protected CarbonRow processRow(CarbonRow row) { + @Override protected CarbonRow processRow(CarbonRow row) { return null; } + @Override public void close() { + executorService.shutdown(); + for (CarbonIterator inputIterator : inputIterators) { + inputIterator.close(); + } + } + /** * This iterator wraps the list of iterators and it starts iterating the each * iterator of the list one by one. It also parse the data while iterating it. */ private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> { - private List<Iterator<Object[]>> inputIterators; + private List<CarbonIterator<Object[]>> inputIterators; - private Iterator<Object[]> currentIterator; + private CarbonIterator<Object[]> currentIterator; private int counter; @@ -101,19 +114,28 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { private RowParser rowParser; - public InputProcessorIterator(List<Iterator<Object[]>> inputIterators, - RowParser rowParser, int batchSize) { + private Future<CarbonRowBatch> future; + + private ExecutorService executorService; + + private boolean nextBatch; + + public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, + RowParser rowParser, int batchSize, ExecutorService executorService) { this.inputIterators = inputIterators; this.batchSize = batchSize; this.rowParser = rowParser; this.counter = 0; // Get the first iterator from the list. currentIterator = inputIterators.get(counter++); + currentIterator.initialize(); + this.executorService = executorService; + this.nextBatch = false; } @Override public boolean hasNext() { - return internalHasNext(); + return nextBatch || internalHasNext(); } private boolean internalHasNext() { @@ -124,6 +146,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { if (counter < inputIterators.size()) { // Get the next iterator from the list. currentIterator = inputIterators.get(counter++); + // Initialize the new iterator + currentIterator.initialize(); hasNext = internalHasNext(); } } @@ -132,14 +156,39 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { @Override public CarbonRowBatch next() { - // Create batch and fill it. - CarbonRowBatch carbonRowBatch = new CarbonRowBatch(); - int count = 0; - while (internalHasNext() && count < batchSize) { - carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next()))); - count++; + CarbonRowBatch result = null; + if (future == null) { + future = getCarbonRowBatch(); + } + try { + result = future.get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + nextBatch = false; + if (hasNext()) { + nextBatch = true; + future = getCarbonRowBatch(); } - return carbonRowBatch; + + return result; + } + + private Future<CarbonRowBatch> getCarbonRowBatch() { + return executorService.submit(new Callable<CarbonRowBatch>() { + @Override public CarbonRowBatch call() throws Exception { + // Create batch and fill it. + CarbonRowBatch carbonRowBatch = new CarbonRowBatch(); + int count = 0; + while (internalHasNext() && count < batchSize) { + carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next()))); + count++; + } + return carbonRowBatch; + } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/63434fac/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java index 3a6afc7..7231775 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java @@ -74,6 +74,8 @@ public class SortDataRows { private SortIntermediateFileMerger intermediateFileMerger; + private final Object addRowsLock = new Object(); + public SortDataRows(SortParameters parameters, SortIntermediateFileMerger intermediateFileMerger) { this.parameters = parameters; @@ -137,6 +139,44 @@ public class SortDataRows { } /** + * This method will be used to add new row + * + * @param rowBatch new rowBatch + * @throws CarbonSortKeyAndGroupByException problem while writing + */ + public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { + // if record holder list size is equal to sort buffer size then it will + // sort the list and then write current list data to file + synchronized (addRowsLock) { + if (entryCount + size >= sortBufferSize) { + LOGGER.debug("************ Writing to temp file ********** "); + intermediateFileMerger.startMergingIfPossible(); + Object[][] recordHolderListLocal = recordHolderList; + int sizeLeft = sortBufferSize - entryCount ; + if (sizeLeft > 0) { + System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft); + } + try { + dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(recordHolderListLocal)); + } catch (Exception e) { + LOGGER.error( + "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); + throw new CarbonSortKeyAndGroupByException(e); + } + // create the new holder Array + this.recordHolderList = new Object[this.sortBufferSize][]; + this.entryCount = 0; + size = size - sizeLeft; + if (size == 0) { + return; + } + } + System.arraycopy(rowBatch, 0, recordHolderList, entryCount, size); + entryCount += size; + } + } + + /** * Below method will be used to start storing process This method will get * all the temp files present in sort temp folder then it will create the * record holder heap and then it will read first record from each file and