[CARBONDATA-1472] Optimize memory and fix nosort queries 1.Use UnsafeManager for dimension chunks as well to avoid leaks 2.Fix filters on nosort columns. 3.Optimize scanRDD
This closes #1346 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/887310fc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/887310fc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/887310fc Branch: refs/heads/branch-1.2 Commit: 887310fc75e8c20c82929d2d92114887cecf44df Parents: dde2f4c Author: Ravindra Pesala <ravi.pes...@gmail.com> Authored: Sun Sep 10 14:57:09 2017 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Sep 13 22:03:26 2017 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 ++ .../UnsafeAbstractDimensionDataChunkStore.java | 17 +++++--- .../core/memory/MemoryAllocatorFactory.java | 46 -------------------- .../core/memory/UnsafeMemoryManager.java | 21 ++++++--- .../executor/impl/AbstractQueryExecutor.java | 6 +-- .../executer/RangeValueFilterExecuterImpl.java | 10 +++-- ...velRangeLessThanEqualFilterExecuterImpl.java | 8 +++- .../RowLevelRangeLessThanFiterExecuterImpl.java | 8 +++- .../carbondata/hadoop/AbstractRecordReader.java | 2 - .../carbondata/spark/rdd/CarbonScanRDD.scala | 43 +++++++++++++----- 10 files changed, 84 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 5a68f60..0348bd1 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1371,6 +1371,10 @@ public final class CarbonCommonConstants { public static final String USE_DISTRIBUTED_DATAMAP_DEFAULT = "false"; + public static final String CARBON_USE_BLOCKLET_DISTRIBUTION = "carbon.blocklet.distribution"; + + public static final String CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT = "true"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java index 704f2d3..22c2e16 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java @@ -20,9 +20,11 @@ package org.apache.carbondata.core.datastore.chunk.store.impl.unsafe; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore; import org.apache.carbondata.core.memory.CarbonUnsafe; -import org.apache.carbondata.core.memory.MemoryAllocatorFactory; import org.apache.carbondata.core.memory.MemoryBlock; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.util.ThreadLocalTaskInfo; /** * Responsibility is to store dimension data in memory. storage can be on heap @@ -60,6 +62,8 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension */ protected boolean isMemoryOccupied; + private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); + /** * Constructor * @@ -69,9 +73,12 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension */ public UnsafeAbstractDimensionDataChunkStore(long totalSize, boolean isInvertedIdex, int numberOfRows) { - // allocating the data page - this.dataPageMemoryBlock = - MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(totalSize); + try { + // allocating the data page + this.dataPageMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize); + } catch (MemoryException e) { + throw new RuntimeException(e); + } this.isExplicitSorted = isInvertedIdex; } @@ -116,7 +123,7 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension return; } // free data page memory - MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().free(dataPageMemoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataPageMemoryBlock); isMemoryReleased = true; this.dataPageMemoryBlock = null; this.isMemoryOccupied = false; http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java deleted file mode 100644 index e55af93..0000000 --- a/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.memory; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonProperties; - -/** - * Factory class to to get the memory allocator instance - */ -public class MemoryAllocatorFactory { - - private MemoryAllocator memoryAllocator; - - public static final MemoryAllocatorFactory INSATANCE = new MemoryAllocatorFactory(); - - private MemoryAllocatorFactory() { - boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.USE_OFFHEAP_IN_QUERY_PROCSSING, - CarbonCommonConstants.USE_OFFHEAP_IN_QUERY_PROCSSING_DEFAULT)); - if (offHeap) { - memoryAllocator = MemoryAllocator.UNSAFE; - } else { - memoryAllocator = MemoryAllocator.HEAP; - } - } - - public MemoryAllocator getMemoryAllocator() { - return memoryAllocator; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 06f907d..4222e14 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -96,9 +96,11 @@ public class UnsafeMemoryManager { taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock); } listOfMemoryBlock.add(allocate); - LOGGER.info("Memory block (" + allocate + ") is created with size " + allocate.size() - + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) - + "Bytes"); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Memory block (" + allocate + ") is created with size " + allocate.size() + + ". Total memory used " + memoryUsed + "Bytes, left " + (totalMemory - memoryUsed) + + "Bytes"); + } return allocate; } return null; @@ -112,9 +114,11 @@ public class UnsafeMemoryManager { allocator.free(memoryBlock); memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; - LOGGER.info( - "Freeing memory of size: " + memoryBlock.size() + "available memory: " + (totalMemory - - memoryUsed)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Freeing memory of size: " + memoryBlock.size() + "available memory: " + (totalMemory + - memoryUsed)); + } } } @@ -140,6 +144,8 @@ public class UnsafeMemoryManager { "Freeing memory of size: " + occuppiedMemory + ": Current available memory is: " + ( totalMemory - memoryUsed)); } + LOGGER.info("Total memory used after task " + taskId + " is " + memoryUsed + + " Current tasks running now are : " + taskIdToMemoryBlockMap.keySet()); } public synchronized boolean isMemoryAvailable() { @@ -160,6 +166,7 @@ public class UnsafeMemoryManager { baseBlock = INSTANCE.allocateMemory(taskId, size); if (baseBlock == null) { try { + LOGGER.info("Memory is not available, retry after 500 millis"); Thread.sleep(500); } catch (InterruptedException e) { throw new MemoryException(e); @@ -170,6 +177,8 @@ public class UnsafeMemoryManager { tries++; } if (baseBlock == null) { + LOGGER.error(" Memory Used : " + INSTANCE.memoryUsed + " Tasks running : " + + taskIdToMemoryBlockMap.keySet()); throw new MemoryException("Not enough memory"); } return baseBlock; http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index f159744..e8e7bfb 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -155,10 +155,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { queryModel.getAbsoluteTableIdentifier()); cache.removeTableBlocksIfHorizontalCompactionDone(queryModel); queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers); - queryStatistic - .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis()); - queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic); } + queryStatistic + .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis()); + queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic); // calculating the total number of aggeragted columns int aggTypeCount = queryModel.getQueryMeasures().size(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java index c2e077e..63472f9 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java @@ -552,11 +552,15 @@ public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl { if (dimColEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory .getDirectDictionaryGenerator(dimColEvaluatorInfo.getDimension().getDataType()); - int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1; + int key = directDictionaryGenerator.generateDirectSurrogateKey(null); CarbonDimension currentBlockDimension = segmentProperties.getDimensions().get(dimensionBlocksIndex); - defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension, - this.segmentProperties.getSortColumnsGenerator()); + if (currentBlockDimension.isSortColumn()) { + defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension, + this.segmentProperties.getSortColumnsGenerator()); + } else { + defaultValue = ByteUtil.toBytes(key); + } } else { defaultValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java index 63c9395..50231d6 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java @@ -268,8 +268,12 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1; CarbonDimension currentBlockDimension = segmentProperties.getDimensions().get(dimensionBlocksIndex[0]); - defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension, - this.segmentProperties.getSortColumnsGenerator()); + if (currentBlockDimension.isSortColumn()) { + defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension, + this.segmentProperties.getSortColumnsGenerator()); + } else { + defaultValue = ByteUtil.toBytes(key); + } } BitSet bitSet = null; if (dimensionColumnDataChunk.isExplicitSorted()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java index 86ded59..1972f8e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java @@ -270,8 +270,12 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1; CarbonDimension currentBlockDimension = segmentProperties.getDimensions().get(dimensionBlocksIndex[0]); - defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension, - this.segmentProperties.getSortColumnsGenerator()); + if (currentBlockDimension.isSortColumn()) { + defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension, + this.segmentProperties.getSortColumnsGenerator()); + } else { + defaultValue = ByteUtil.toBytes(key); + } } BitSet bitSet = null; if (dimensionColumnDataChunk.isExplicitSorted()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java index e571ccf..62a97f9 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java @@ -39,7 +39,5 @@ public abstract class AbstractRecordReader<T> extends RecordReader<Void, T> { QueryStatistic queryStatistic = new QueryStatistic(); queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount); recorder.recordStatistics(queryStatistic); - // print executor query statistics for each task_id - recorder.logStatisticsAsTableExecutor(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 0035c44..1c08307 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -21,6 +21,7 @@ import java.text.SimpleDateFormat import java.util.{ArrayList, Date, List} import scala.collection.JavaConverters._ +import scala.util.Random import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce._ @@ -54,7 +55,7 @@ class CarbonScanRDD( columnProjection: CarbonProjection, filterExpression: Expression, identifier: AbsoluteTableIdentifier, - serializedTableInfo: Array[Byte], + @transient serializedTableInfo: Array[Byte], @transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics) extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) { @@ -147,13 +148,30 @@ class CarbonScanRDD( } noOfNodes = nodeBlockMapping.size } else { - splits.asScala.zipWithIndex.foreach { splitWithIndex => - val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, - Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, - splitWithIndex._1.getLocations) - val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) - result.add(partition) + if (CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION, + CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) { + // Use blocklet distribution + // Randomize the blocklets for better shuffling + Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, + Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, + splitWithIndex._1.getLocations) + val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) + result.add(partition) + } + } else { + // Use block distribution + splits.asScala.map(_.asInstanceOf[CarbonInputSplit]). + groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, + splitWithIndex._1.asJava, + splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray) + val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) + result.add(partition) + } } } @@ -176,7 +194,7 @@ class CarbonScanRDD( } override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - + val queryStartTime = System.currentTimeMillis val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) if (null == carbonPropertiesFilePath) { System.setProperty("carbon.properties.filepath", @@ -209,16 +227,15 @@ class CarbonScanRDD( } reader.initialize(inputSplit, attemptContext) - val queryStartTime = System.currentTimeMillis new Iterator[Any] { private var havePair = false private var finished = false context.addTaskCompletionListener { context => - logStatistics(queryStartTime, model.getStatisticsRecorder) reader.close() - close() + close() + logStatistics(queryStartTime, model.getStatisticsRecorder) } override def hasNext: Boolean = { @@ -288,6 +305,8 @@ class CarbonScanRDD( queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART, System.currentTimeMillis - queryStartTime) recorder.recordStatistics(queryStatistic) + // print executor query statistics for each task_id + recorder.logStatisticsAsTableExecutor() } /**