[CARBONDATA-2625] While BlockletDataMap loading, avoid multiple times listing of files
CarbonReader is very slow for many files as blockletDataMap lists files of folder while loading each segment. This optimization lists once across segment loads. This closes #2441 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e580d64e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e580d64e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e580d64e Branch: refs/heads/external-format Commit: e580d64ef5353ed033343d854da7e02539cdbeb4 Parents: 6351c3a Author: rahul <rahul.ku...@knoldus.in> Authored: Wed Jul 4 19:31:51 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Wed Aug 1 16:40:29 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/TableDataMap.java | 5 +-- .../core/datamap/dev/DataMapFactory.java | 16 +++++++++ .../indexstore/BlockletDataMapIndexStore.java | 29 ++++++++++++++--- .../indexstore/BlockletDataMapIndexWrapper.java | 9 +++++- .../blockletindex/BlockletDataMapFactory.java | 34 ++++++++++++++++++++ .../core/util/BlockletDataMapUtil.java | 6 ++-- .../TestBlockletDataMapFactory.java | 2 +- .../partition/TestAlterPartitionTable.scala | 5 +++ 8 files changed, 95 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index f6da73e..aed8c60 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -89,15 +90,15 @@ public final class TableDataMap extends OperationEventListener { List<PartitionSpec> partitions) throws IOException { List<ExtendedBlocklet> blocklets = new ArrayList<>(); SegmentProperties segmentProperties; + Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments); for (Segment segment : segments) { List<Blocklet> pruneBlocklets = new ArrayList<>(); // if filter is not passed then return all the blocklets if (filterExp == null) { pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions); } else { - List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment); segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment); - for (DataMap dataMap : dataMaps) { + for (DataMap dataMap : dataMaps.get(segment)) { pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions)); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index ab0f8ea..67f82b2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -17,8 +17,10 @@ package org.apache.carbondata.core.datamap.dev; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; @@ -26,6 +28,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.features.TableOperation; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -67,6 +70,19 @@ public abstract class DataMapFactory<T extends DataMap> { */ public abstract DataMapBuilder createBuilder(Segment segment, String shardName, SegmentProperties segmentProperties) throws IOException; + + /** + * Get the datamap for all segments + */ + public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments) + throws IOException { + Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>(); + for (Segment segment : segments) { + dataMaps.put(segment, (List<CoarseGrainDataMap>) this.getDataMaps(segment)); + } + return dataMaps; + } + /** * Get the datamap for segmentid */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index 3a8aa52..fa84f30 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.indexstore; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -73,6 +74,11 @@ public class BlockletDataMapIndexStore @Override public BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper) throws IOException { + return get(identifierWrapper, null); + } + + private BlockletDataMapIndexWrapper get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper, + Map<String, Map<String, BlockMetaInfo>> segInfoCache) throws IOException { TableBlockIndexUniqueIdentifier identifier = identifierWrapper.getTableBlockIndexUniqueIdentifier(); String lruCacheKey = identifier.getUniqueTableSegmentIdentifier(); @@ -84,8 +90,16 @@ public class BlockletDataMapIndexStore SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); Set<String> filesRead = new HashSet<>(); String segmentFilePath = identifier.getIndexFilePath(); - Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = BlockletDataMapUtil - .createCarbonDataFileBlockMetaInfoMapping(segmentFilePath); + if (segInfoCache == null) { + segInfoCache = new HashMap<String, Map<String, BlockMetaInfo>>(); + } + Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = + segInfoCache.get(segmentFilePath); + if (carbonDataFileBlockMetaInfoMapping == null) { + carbonDataFileBlockMetaInfoMapping = + BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath); + segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping); + } // if the identifier is not a merge file we can directly load the datamaps if (identifier.getMergeIndexFileName() == null) { Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil @@ -95,7 +109,8 @@ public class BlockletDataMapIndexStore loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap, identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe()); dataMaps.add(blockletDataMap); - blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps); + blockletDataMapIndexWrapper = + new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps); } else { // if the identifier is a merge file then collect the index files and load the datamaps List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = @@ -114,7 +129,8 @@ public class BlockletDataMapIndexStore dataMaps.add(blockletDataMap); } } - blockletDataMapIndexWrapper = new BlockletDataMapIndexWrapper(dataMaps); + blockletDataMapIndexWrapper = + new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps); } lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper, blockletDataMapIndexWrapper.getMemorySize()); @@ -133,6 +149,9 @@ public class BlockletDataMapIndexStore @Override public List<BlockletDataMapIndexWrapper> getAll( List<TableBlockIndexUniqueIdentifierWrapper> tableSegmentUniqueIdentifiers) throws IOException { + Map<String, Map<String, BlockMetaInfo>> segInfoCache + = new HashMap<String, Map<String, BlockMetaInfo>>(); + List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = new ArrayList<>(tableSegmentUniqueIdentifiers.size()); List<TableBlockIndexUniqueIdentifierWrapper> missedIdentifiersWrapper = new ArrayList<>(); @@ -151,7 +170,7 @@ public class BlockletDataMapIndexStore } if (missedIdentifiersWrapper.size() > 0) { for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : missedIdentifiersWrapper) { - blockletDataMapIndexWrapper = get(identifierWrapper); + blockletDataMapIndexWrapper = get(identifierWrapper, segInfoCache); blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java index 52f2432..b0fb13e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java @@ -30,12 +30,15 @@ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable { private List<BlockDataMap> dataMaps; + private String segmentId; + // size of the wrapper. basically the total size of the datamaps this wrapper is holding private long wrapperSize; - public BlockletDataMapIndexWrapper(List<BlockDataMap> dataMaps) { + public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps) { this.dataMaps = dataMaps; this.wrapperSize = 0L; + this.segmentId = segmentId; // add the size of each and every datamap in this wrapper for (BlockDataMap dataMap : dataMaps) { this.wrapperSize += dataMap.getMemorySize(); @@ -57,4 +60,8 @@ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable { public List<BlockDataMap> getDataMaps() { return dataMaps; } + + public String getSegmentId() { + return segmentId; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 4dd78ee..61d93f7 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -120,6 +120,40 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory throw new UnsupportedOperationException("not implemented"); } + /** + * Get the datamap for all segments + */ + public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments) + throws IOException { + List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers = + new ArrayList<>(); + Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>(); + Map<String, Segment> segmentMap = new HashMap<>(); + for (Segment segment : segments) { + segmentMap.put(segment.getSegmentNo(), segment); + Set<TableBlockIndexUniqueIdentifier> identifiers = + getTableBlockIndexUniqueIdentifiers(segment); + + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { + tableBlockIndexUniqueIdentifierWrappers.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + } + } + List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = + cache.getAll(tableBlockIndexUniqueIdentifierWrappers); + for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) { + Segment segment = segmentMap.get(wrapper.getSegmentId()); + List<CoarseGrainDataMap> datamapList = dataMaps.get(segment); + if (null == datamapList) { + datamapList = new ArrayList<CoarseGrainDataMap>(); + } + datamapList.addAll(wrapper.getDataMaps()); + dataMaps.put(segment, datamapList); + } + return dataMaps; + } + @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException { List<CoarseGrainDataMap> dataMaps = new ArrayList<>(); Set<TableBlockIndexUniqueIdentifier> identifiers = http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java index db41e73..68ce1fb 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -115,7 +115,7 @@ public class BlockletDataMapUtil { CarbonTable.updateTableByTableInfo(carbonTable, carbonTable.getTableInfo()); } String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath(); - if (null == blockMetaInfoMap.get(blockPath)) { + if (null != fileNameToMetaInfoMapping && null == blockMetaInfoMap.get(blockPath)) { BlockMetaInfo blockMetaInfo = createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath); // if blockMetaInfo is null that means the file has been deleted from the file system. // This can happen in case IUD scenarios where after deleting or updating the data the @@ -123,6 +123,8 @@ public class BlockletDataMapUtil { if (null != blockMetaInfo) { blockMetaInfoMap.put(blockPath, blockMetaInfo); } + } else { + blockMetaInfoMap.put(blockPath, new BlockMetaInfo(new String[] {},0)); } } return blockMetaInfoMap; @@ -151,7 +153,7 @@ public class BlockletDataMapUtil { String[] location = file.getLocations(); long len = file.getSize(); BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len); - fileNameToMetaInfoMapping.put(file.getPath().toString(), blockMetaInfo); + fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo); } } return fileNameToMetaInfoMapping; http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java index d2a6f18..a3acfab 100644 --- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java +++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java @@ -103,7 +103,7 @@ public class TestBlockletDataMapFactory { BlockletDataMapIndexWrapper.class); method.setAccessible(true); method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifierWrapper, - new BlockletDataMapIndexWrapper(dataMaps)); + new BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), dataMaps)); BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifierWrapper); assert null != result; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index 882630a..af17252 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -43,6 +43,9 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + // deactivating the merge-index for old partition implimentation because we are not supporting + // merge-index for the same currently. + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,"false") /** * list_table_area_origin * list_table_area @@ -891,6 +894,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT) } def dropTable {