[CARBONDATA-2625] While BlockletDataMap loading, avoid multiple times listing 
of files

CarbonReader is very slow for many files as blockletDataMap lists files
of folder while loading each segment. This optimization lists once across 
segment loads.

This closes #2441


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e580d64e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e580d64e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e580d64e

Branch: refs/heads/external-format
Commit: e580d64ef5353ed033343d854da7e02539cdbeb4
Parents: 6351c3a
Author: rahul <rahul.ku...@knoldus.in>
Authored: Wed Jul 4 19:31:51 2018 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Wed Aug 1 16:40:29 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  5 +--
 .../core/datamap/dev/DataMapFactory.java        | 16 +++++++++
 .../indexstore/BlockletDataMapIndexStore.java   | 29 ++++++++++++++---
 .../indexstore/BlockletDataMapIndexWrapper.java |  9 +++++-
 .../blockletindex/BlockletDataMapFactory.java   | 34 ++++++++++++++++++++
 .../core/util/BlockletDataMapUtil.java          |  6 ++--
 .../TestBlockletDataMapFactory.java             |  2 +-
 .../partition/TestAlterPartitionTable.scala     |  5 +++
 8 files changed, 95 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index f6da73e..aed8c60 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -89,15 +90,15 @@ public final class TableDataMap extends 
OperationEventListener {
       List<PartitionSpec> partitions) throws IOException {
     List<ExtendedBlocklet> blocklets = new ArrayList<>();
     SegmentProperties segmentProperties;
+    Map<Segment, List<DataMap>> dataMaps = 
dataMapFactory.getDataMaps(segments);
     for (Segment segment : segments) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
       // if filter is not passed then return all the blocklets
       if (filterExp == null) {
         pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, 
partitions);
       } else {
-        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
         segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segment);
-        for (DataMap dataMap : dataMaps) {
+        for (DataMap dataMap : dataMaps.get(segment)) {
           pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, 
partitions));
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index ab0f8ea..67f82b2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -17,8 +17,10 @@
 package org.apache.carbondata.core.datamap.dev;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -26,6 +28,7 @@ import 
org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -67,6 +70,19 @@ public abstract class DataMapFactory<T extends DataMap> {
    */
   public abstract DataMapBuilder createBuilder(Segment segment, String 
shardName,
       SegmentProperties segmentProperties) throws IOException;
+
+  /**
+   * Get the datamap for all segments
+   */
+  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> 
segments)
+      throws IOException {
+    Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>();
+    for (Segment segment : segments) {
+      dataMaps.put(segment, (List<CoarseGrainDataMap>) 
this.getDataMaps(segment));
+    }
+    return dataMaps;
+  }
+
   /**
    * Get the datamap for segmentid
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 3a8aa52..fa84f30 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -73,6 +74,11 @@ public class BlockletDataMapIndexStore
   @Override
   public BlockletDataMapIndexWrapper 
get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper)
       throws IOException {
+    return get(identifierWrapper, null);
+  }
+
+  private BlockletDataMapIndexWrapper 
get(TableBlockIndexUniqueIdentifierWrapper identifierWrapper,
+      Map<String, Map<String, BlockMetaInfo>> segInfoCache) throws IOException 
{
     TableBlockIndexUniqueIdentifier identifier =
         identifierWrapper.getTableBlockIndexUniqueIdentifier();
     String lruCacheKey = identifier.getUniqueTableSegmentIdentifier();
@@ -84,8 +90,16 @@ public class BlockletDataMapIndexStore
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
         Set<String> filesRead = new HashSet<>();
         String segmentFilePath = identifier.getIndexFilePath();
-        Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping = 
BlockletDataMapUtil
-            .createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+        if (segInfoCache == null) {
+          segInfoCache = new HashMap<String, Map<String, BlockMetaInfo>>();
+        }
+        Map<String, BlockMetaInfo> carbonDataFileBlockMetaInfoMapping =
+            segInfoCache.get(segmentFilePath);
+        if (carbonDataFileBlockMetaInfoMapping == null) {
+          carbonDataFileBlockMetaInfoMapping =
+              
BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+          segInfoCache.put(segmentFilePath, 
carbonDataFileBlockMetaInfoMapping);
+        }
         // if the identifier is not a merge file we can directly load the 
datamaps
         if (identifier.getMergeIndexFileName() == null) {
           Map<String, BlockMetaInfo> blockMetaInfoMap = BlockletDataMapUtil
@@ -95,7 +109,8 @@ public class BlockletDataMapIndexStore
               loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
                   identifierWrapper.getCarbonTable(), 
identifierWrapper.isAddTableBlockToUnsafe());
           dataMaps.add(blockletDataMap);
-          blockletDataMapIndexWrapper = new 
BlockletDataMapIndexWrapper(dataMaps);
+          blockletDataMapIndexWrapper =
+              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), 
dataMaps);
         } else {
           // if the identifier is a merge file then collect the index files 
and load the datamaps
           List<TableBlockIndexUniqueIdentifier> 
tableBlockIndexUniqueIdentifiers =
@@ -114,7 +129,8 @@ public class BlockletDataMapIndexStore
               dataMaps.add(blockletDataMap);
             }
           }
-          blockletDataMapIndexWrapper = new 
BlockletDataMapIndexWrapper(dataMaps);
+          blockletDataMapIndexWrapper =
+              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), 
dataMaps);
         }
         lruCache.put(identifier.getUniqueTableSegmentIdentifier(), 
blockletDataMapIndexWrapper,
             blockletDataMapIndexWrapper.getMemorySize());
@@ -133,6 +149,9 @@ public class BlockletDataMapIndexStore
   @Override public List<BlockletDataMapIndexWrapper> getAll(
       List<TableBlockIndexUniqueIdentifierWrapper> 
tableSegmentUniqueIdentifiers)
       throws IOException {
+    Map<String, Map<String, BlockMetaInfo>> segInfoCache
+        = new HashMap<String, Map<String, BlockMetaInfo>>();
+
     List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
         new ArrayList<>(tableSegmentUniqueIdentifiers.size());
     List<TableBlockIndexUniqueIdentifierWrapper> missedIdentifiersWrapper = 
new ArrayList<>();
@@ -151,7 +170,7 @@ public class BlockletDataMapIndexStore
       }
       if (missedIdentifiersWrapper.size() > 0) {
         for (TableBlockIndexUniqueIdentifierWrapper identifierWrapper : 
missedIdentifiersWrapper) {
-          blockletDataMapIndexWrapper = get(identifierWrapper);
+          blockletDataMapIndexWrapper = get(identifierWrapper, segInfoCache);
           blockletDataMapIndexWrappers.add(blockletDataMapIndexWrapper);
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
index 52f2432..b0fb13e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -30,12 +30,15 @@ public class BlockletDataMapIndexWrapper implements 
Cacheable, Serializable {
 
   private List<BlockDataMap> dataMaps;
 
+  private String segmentId;
+
   // size of the wrapper. basically the total size of the datamaps this 
wrapper is holding
   private long wrapperSize;
 
-  public BlockletDataMapIndexWrapper(List<BlockDataMap> dataMaps) {
+  public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> 
dataMaps) {
     this.dataMaps = dataMaps;
     this.wrapperSize = 0L;
+    this.segmentId = segmentId;
     // add the size of each and every datamap in this wrapper
     for (BlockDataMap dataMap : dataMaps) {
       this.wrapperSize += dataMap.getMemorySize();
@@ -57,4 +60,8 @@ public class BlockletDataMapIndexWrapper implements 
Cacheable, Serializable {
   public List<BlockDataMap> getDataMaps() {
     return dataMaps;
   }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 4dd78ee..61d93f7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -120,6 +120,40 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
     throw new UnsupportedOperationException("not implemented");
   }
 
+  /**
+   * Get the datamap for all segments
+   */
+  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> 
segments)
+      throws IOException {
+    List<TableBlockIndexUniqueIdentifierWrapper> 
tableBlockIndexUniqueIdentifierWrappers =
+        new ArrayList<>();
+    Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>();
+    Map<String, Segment> segmentMap = new HashMap<>();
+    for (Segment segment : segments) {
+      segmentMap.put(segment.getSegmentNo(), segment);
+      Set<TableBlockIndexUniqueIdentifier> identifiers =
+          getTableBlockIndexUniqueIdentifiers(segment);
+
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : 
identifiers) {
+        tableBlockIndexUniqueIdentifierWrappers.add(
+            new 
TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+                this.getCarbonTable()));
+      }
+    }
+    List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
+        cache.getAll(tableBlockIndexUniqueIdentifierWrappers);
+    for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) {
+      Segment segment = segmentMap.get(wrapper.getSegmentId());
+      List<CoarseGrainDataMap> datamapList = dataMaps.get(segment);
+      if (null == datamapList) {
+        datamapList = new ArrayList<CoarseGrainDataMap>();
+      }
+      datamapList.addAll(wrapper.getDataMaps());
+      dataMaps.put(segment, datamapList);
+    }
+    return dataMaps;
+  }
+
   @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) 
throws IOException {
     List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
     Set<TableBlockIndexUniqueIdentifier> identifiers =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index db41e73..68ce1fb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -115,7 +115,7 @@ public class BlockletDataMapUtil {
         CarbonTable.updateTableByTableInfo(carbonTable, 
carbonTable.getTableInfo());
       }
       String blockPath = 
footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      if (null == blockMetaInfoMap.get(blockPath)) {
+      if (null != fileNameToMetaInfoMapping && null == 
blockMetaInfoMap.get(blockPath)) {
         BlockMetaInfo blockMetaInfo = 
createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath);
         // if blockMetaInfo is null that means the file has been deleted from 
the file system.
         // This can happen in case IUD scenarios where after deleting or 
updating the data the
@@ -123,6 +123,8 @@ public class BlockletDataMapUtil {
         if (null != blockMetaInfo) {
           blockMetaInfoMap.put(blockPath, blockMetaInfo);
         }
+      } else {
+        blockMetaInfoMap.put(blockPath, new BlockMetaInfo(new String[] {},0));
       }
     }
     return blockMetaInfoMap;
@@ -151,7 +153,7 @@ public class BlockletDataMapUtil {
         String[] location = file.getLocations();
         long len = file.getSize();
         BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
-        fileNameToMetaInfoMapping.put(file.getPath().toString(), 
blockMetaInfo);
+        fileNameToMetaInfoMapping.put(file.getPath(), blockMetaInfo);
       }
     }
     return fileNameToMetaInfoMapping;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index d2a6f18..a3acfab 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -103,7 +103,7 @@ public class TestBlockletDataMapFactory {
             BlockletDataMapIndexWrapper.class);
     method.setAccessible(true);
     method.invoke(blockletDataMapFactory, 
tableBlockIndexUniqueIdentifierWrapper,
-        new BlockletDataMapIndexWrapper(dataMaps));
+        new 
BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), 
dataMaps));
     BlockletDataMapIndexWrapper result = 
cache.getIfPresent(tableBlockIndexUniqueIdentifierWrapper);
     assert null != result;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e580d64e/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 882630a..af17252 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -43,6 +43,9 @@ class TestAlterPartitionTable extends QueryTest with 
BeforeAndAfterAll {
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+      // deactivating the merge-index for old partition implimentation because 
we are not supporting
+      // merge-index for the same currently.
+        
.addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,"false")
     /**
      * list_table_area_origin
      * list_table_area
@@ -891,6 +894,8 @@ class TestAlterPartitionTable extends QueryTest with 
BeforeAndAfterAll {
     .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+        CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
   }
 
   def dropTable {

Reply via email to