Github user mohammadshahidkhan commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/454#discussion_r94019181
  
    --- Diff: 
core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
 ---
    @@ -16,113 +16,142 @@
      * specific language governing permissions and limitations
      * under the License.
      */
    +
     package org.apache.carbondata.core.carbon.datastore;
     
     import java.util.ArrayList;
     import java.util.Arrays;
    -import java.util.Iterator;
     import java.util.List;
    -import java.util.Map;
     import java.util.concurrent.Callable;
    -import java.util.concurrent.ConcurrentHashMap;
    -import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.CopyOnWriteArrayList;
     import java.util.concurrent.ExecutorService;
     import java.util.concurrent.Executors;
     import java.util.concurrent.Future;
     import java.util.concurrent.TimeUnit;
     
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.cache.CarbonLRUCache;
     import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
     import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
     import org.apache.carbondata.core.carbon.datastore.block.BlockIndex;
     import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
     import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
    +import 
org.apache.carbondata.core.carbon.datastore.block.TableBlockUniqueIdentifier;
     import 
org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
    -import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
     import org.apache.carbondata.core.constants.CarbonCommonConstants;
     import org.apache.carbondata.core.util.CarbonProperties;
    -import org.apache.carbondata.core.util.CarbonUtil;
     import org.apache.carbondata.core.util.CarbonUtilException;
     
     /**
    - * Singleton Class to handle loading, unloading,clearing,storing of the 
table
    - * blocks
    + * This class is used to load the B-Tree in Executor LRU Cache
      */
    -public class BlockIndexStore {
    -
    -  /**
    -   * singleton instance
    -   */
    -  private static final BlockIndexStore CARBONTABLEBLOCKSINSTANCE = new 
BlockIndexStore();
    -
    -  /**
    -   * map to hold the table and its list of blocks
    -   */
    -  private Map<AbsoluteTableIdentifier, Map<BlockInfo, AbstractIndex>> 
tableBlocksMap;
    -
    -  /**
    -   * map to maintain segment id to block info map, this map will be used to
    -   * while removing the block from memory when segment is compacted or 
deleted
    -   */
    -  private Map<AbsoluteTableIdentifier, Map<String, List<BlockInfo>>> 
segmentIdToBlockListMap;
    -
    -  /**
    -   * map of block info to lock object map, while loading the btree this 
will be filled
    -   * and removed after loading the tree for that particular block info, 
this will be useful
    -   * while loading the tree concurrently so only block level lock will be 
applied another
    -   * block can be loaded concurrently
    -   */
    -  private Map<BlockInfo, Object> blockInfoLock;
    +public class BlockIndexStore<K, V> extends AbstractBlockIndexStoreCache<K, 
V> {
     
       /**
    -   * table and its lock object to this will be useful in case of concurrent
    -   * query scenario when more than one query comes for same table and in 
that
    -   * case it will ensure that only one query will able to load the blocks
    +   * LOGGER instance
        */
    -  private Map<AbsoluteTableIdentifier, Object> tableLockMap;
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(BlockIndexStore.class.getName());
    +  public BlockIndexStore(String carbonStorePath, CarbonLRUCache lruCache) {
    +    super(carbonStorePath, lruCache);
    +  }
     
       /**
    -   * block info to future task mapping
    -   * useful when blocklet distribution is enabled and
    -   * same block is loaded by multiple thread
    +   * The method loads the block meta in B-tree lru cache and returns the 
block meta.
    +   *
    +   * @param tableBlockUniqueIdentifier Uniquely identifies the block
    +   * @return returns the blocks B-Tree meta
    +   * @throws CarbonUtilException
        */
    -  private Map<BlockInfo, Future<AbstractIndex>> mapOfBlockInfoToFuture;
    +  @Override public AbstractIndex get(TableBlockUniqueIdentifier 
tableBlockUniqueIdentifier)
    +      throws CarbonUtilException {
    +    TableBlockInfo tableBlockInfo = 
tableBlockUniqueIdentifier.getTableBlockInfo();
    +    BlockInfo blockInfo = new BlockInfo(tableBlockInfo);
    +    String lruCacheKey =
    +        
getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), 
blockInfo);
    +    AbstractIndex tableBlock = (AbstractIndex) lruCache.get(lruCacheKey);
     
    -  private BlockIndexStore() {
    -    tableBlocksMap = new ConcurrentHashMap<AbsoluteTableIdentifier, 
Map<BlockInfo, AbstractIndex>>(
    -        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    -    tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
    -        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
    -    blockInfoLock = new ConcurrentHashMap<BlockInfo, Object>();
    -    segmentIdToBlockListMap = new ConcurrentHashMap<>();
    -    mapOfBlockInfoToFuture = new ConcurrentHashMap<>();
    +    // if block is not loaded
    +    if (null == tableBlock) {
    +      // check any lock object is present in
    +      // block info lock map
    +      Object blockInfoLockObject = blockInfoLock.get(blockInfo);
    +      // if lock object is not present then acquire
    +      // the lock in block info lock and add a lock object in the map for
    +      // particular block info, added double checking mechanism to add the 
lock
    +      // object so in case of concurrent query we for same block info only 
one lock
    +      // object will be added
    +      if (null == blockInfoLockObject) {
    +        synchronized (blockInfoLock) {
    +          // again checking the block info lock, to check whether lock 
object is present
    +          // or not if now also not present then add a lock object
    +          blockInfoLockObject = blockInfoLock.get(blockInfo);
    +          if (null == blockInfoLockObject) {
    +            blockInfoLockObject = new Object();
    +            blockInfoLock.put(blockInfo, blockInfoLockObject);
    +          }
    +        }
    +      }
    +      //acquire the lock for particular block info
    +      synchronized (blockInfoLockObject) {
    +        // check again whether block is present or not to avoid the
    +        // same block is loaded
    +        //more than once in case of concurrent query
    +        tableBlock = (AbstractIndex) lruCache.get(
    +            
getLruCacheKey(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(), 
blockInfo));
    +        // if still block is not present then load the block
    +        if (null == tableBlock) {
    +          tableBlock = loadBlock(tableBlockUniqueIdentifier);
    +          
fillSegmentIdToBlockListMap(tableBlockUniqueIdentifier.getAbsoluteTableIdentifier(),
    +              blockInfo);
    +        }
    +      }
    +    } else {
    +      tableBlock.incrementAccessCount();
    +    }
    +    return tableBlock;
       }
     
       /**
    -   * Return the instance of this class
    -   *
    -   * @return singleton instance
    +   * @param absoluteTableIdentifier
    +   * @param blockInfo
        */
    -  public static BlockIndexStore getInstance() {
    -    return CARBONTABLEBLOCKSINSTANCE;
    +  private void fillSegmentIdToBlockListMap(AbsoluteTableIdentifier 
absoluteTableIdentifier,
    +      BlockInfo blockInfo) {
    +    TableSegmentUniqueIdentifier segmentIdentifier =
    +        new TableSegmentUniqueIdentifier(absoluteTableIdentifier,
    +            blockInfo.getTableBlockInfo().getSegmentId());
    +    List<BlockInfo> blockInfos =
    +        
segmentIdToBlockListMap.get(segmentIdentifier.getUniqueTableSegmentIdentifier());
    +    if (null == blockInfos) {
    +      synchronized (segmentIdentifier) {
    +        blockInfos =
    +            
segmentIdToBlockListMap.get(segmentIdentifier.getUniqueTableSegmentIdentifier());
    +        if (null == blockInfos) {
    +          blockInfos = new CopyOnWriteArrayList<>();
    --- End diff --
    
    Because two threads can do write for same object.
    Please see the else part
     else {
          blockInfos.add(blockInfo);
        }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to