http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/BtreeBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/BtreeBuilder.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/BtreeBuilder.java
new file mode 100644
index 0000000..3e2b013
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/BtreeBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore;
+
+/**
+ * Below interface will be used to build the index
+ * in some data structure
+ */
+public interface BtreeBuilder {
+
+  /**
+   * Below method will be used to store the leaf collection in some data 
structure
+   */
+  void build(BTreeBuilderInfo blocksBuilderInfos);
+
+  /**
+   * below method to get the first data block
+   *
+   * @return data block
+   */
+  DataRefNode get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
new file mode 100644
index 0000000..9b27c3d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore;
+
+import java.io.IOException;
+
+import 
org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+
+/**
+ * Interface data block reference
+ */
+public interface DataRefNode {
+
+  /**
+   * Method to get the next block this can be used while scanning when
+   * iterator of this class can be used iterate over blocks
+   *
+   * @return next block
+   */
+  DataRefNode getNextDataRefNode();
+
+  /**
+   * to get the number of keys tuples present in the block
+   *
+   * @return number of keys in the block
+   */
+  int nodeSize();
+
+  /**
+   * Method can be used to get the block index .This can be used when multiple
+   * thread can be used scan group of blocks in that can we can assign the
+   * some of the blocks to one thread and some to other
+   *
+   * @return block number
+   */
+  long nodeNumber();
+
+  /**
+   * This method will be used to get the max value of all the columns this can
+   * be used in case of filter query
+   *
+   */
+  byte[][] getColumnsMaxValue();
+
+  /**
+   * This method will be used to get the min value of all the columns this can
+   * be used in case of filter query
+   *
+   */
+  byte[][] getColumnsMinValue();
+
+  /**
+   * Below method will be used to get the dimension chunks
+   *
+   * @param fileReader   file reader to read the chunks from file
+   * @param blockIndexes range indexes of the blocks need to be read
+   *                     value can be {{0,10},{11,12},{13,13}}
+   *                     here 0 to 10 and 11 to 12 column blocks will be read 
in one
+   *                     IO operation 13th column block will be read separately
+   *                     This will be helpful to reduce IO by reading bigger 
chunk of
+   *                     data in On IO
+   * @return dimension data chunks
+   */
+  DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader, int[][] 
blockIndexes)
+      throws IOException;
+
+  /**
+   * Below method will be used to get the dimension chunk
+   *
+   * @param fileReader file reader to read the chunk from file
+   * @return dimension data chunk
+   */
+  DimensionColumnDataChunk getDimensionChunk(FileHolder fileReader, int 
blockIndexes)
+      throws IOException;
+
+  /**
+   * Below method will be used to get the measure chunk
+   *
+   * @param fileReader   file reader to read the chunk from file
+   * @param blockIndexes range indexes of the blocks need to be read
+   *                     value can be {{0,10},{11,12},{13,13}}
+   *                     here 0 to 10 and 11 to 12 column blocks will be read 
in one
+   *                     IO operation 13th column block will be read separately
+   *                     This will be helpful to reduce IO by reading bigger 
chunk of
+   *                     data in On IO
+   * @return measure column data chunk
+   */
+  MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader, int[][] 
blockIndexes)
+      throws IOException;
+
+  /**
+   * Below method will be used to read the measure chunk
+   *
+   * @param fileReader file read to read the file chunk
+   * @param blockIndex block index to be read from file
+   * @return measure data chunk
+   */
+  MeasureColumnDataChunk getMeasureChunk(FileHolder fileReader, int 
blockIndex) throws IOException;
+
+  /**
+   * @param deleteDeltaDataCache
+   */
+  void setDeleteDeltaDataCache(BlockletLevelDeleteDeltaDataCache 
deleteDeltaDataCache);
+
+  /**
+   * @return
+   */
+  BlockletLevelDeleteDeltaDataCache getDeleteDeltaDataCache();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNodeFinder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNodeFinder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNodeFinder.java
new file mode 100644
index 0000000..8dd6b60
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNodeFinder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore;
+
+/**
+ * Below Interface is to search a block
+ */
+public interface DataRefNodeFinder {
+
+  /**
+   * Below method will be used to get the first tentative block which matches 
with
+   * the search key
+   *
+   * @param dataBlocks complete data blocks present
+   * @return data block
+   */
+  DataRefNode findFirstDataBlock(DataRefNode dataBlocks, IndexKey searchKey);
+
+  /**
+   * Below method will be used to get the last tentative block which matches 
with
+   * the search key
+   *
+   * @param dataBlocks complete data blocks present
+   * @return data block
+   */
+  DataRefNode findLastDataBlock(DataRefNode dataBlocks, IndexKey searchKey);
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java
new file mode 100644
index 0000000..95ec433
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/FileHolder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+import java.io.IOException;
+
+public interface FileHolder {
+  /**
+   * This method will be used to read the byte array from file based on offset
+   * and length(number of bytes) need to read
+   *
+   * @param filePath fully qualified file path
+   * @param offset   reading start position,
+   * @param length   number of bytes to be read
+   * @return read byte array
+   */
+  byte[] readByteArray(String filePath, long offset, int length) throws 
IOException;
+
+  /**
+   * This method will be used to read the byte array from file based on 
length(number of bytes)
+   *
+   * @param filePath fully qualified file path
+   * @param length   number of bytes to be read
+   * @return read byte array
+   */
+  byte[] readByteArray(String filePath, int length) throws IOException;
+
+  /**
+   * This method will be used to read int from file from postion(offset), here
+   * length will be always 4 bacause int byte size if 4
+   *
+   * @param filePath fully qualified file path
+   * @param offset   reading start position,
+   * @return read int
+   */
+  int readInt(String filePath, long offset) throws IOException;
+
+  /**
+   * This method will be used to read long from file from postion(offset), here
+   * length will be always 8 bacause int byte size is 8
+   *
+   * @param filePath fully qualified file path
+   * @param offset   reading start position,
+   * @return read long
+   */
+  long readLong(String filePath, long offset) throws IOException;
+
+  /**
+   * This method will be used to read int from file from postion(offset), here
+   * length will be always 4 bacause int byte size if 4
+   *
+   * @param filePath fully qualified file path
+   * @return read int
+   */
+  int readInt(String filePath) throws IOException;
+
+  /**
+   * This method will be used to read long value from file from 
postion(offset), here
+   * length will be always 8 because long byte size if 4
+   *
+   * @param filePath fully qualified file path
+   * @param offset   reading start position,
+   * @return read long
+   */
+  long readDouble(String filePath, long offset) throws IOException;
+
+  /**
+   * This method will be used to close all the streams currently present in 
the cache
+   */
+  void finish() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/IndexKey.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/IndexKey.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/IndexKey.java
new file mode 100644
index 0000000..3c28298
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/IndexKey.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore;
+
+/**
+ * Index class to store the index of the segment blocklet infos
+ */
+public class IndexKey {
+
+  /**
+   * key which is generated from key generator
+   */
+  private byte[] dictionaryKeys;
+
+  /**
+   * key which was no generated using key generator
+   * <Index of FirstKey (2 bytes)><Index of SecondKey (2 bytes)><Index of NKey 
(2 bytes)>
+   * <First Key ByteArray><2nd Key ByteArray><N Key ByteArray>
+   */
+  private byte[] noDictionaryKeys;
+
+  public IndexKey(byte[] dictionaryKeys, byte[] noDictionaryKeys) {
+    this.dictionaryKeys = dictionaryKeys;
+    this.noDictionaryKeys = noDictionaryKeys;
+    if (null == dictionaryKeys) {
+      this.dictionaryKeys = new byte[0];
+    }
+    if (null == noDictionaryKeys) {
+      this.noDictionaryKeys = new byte[0];
+    }
+  }
+
+  /**
+   * @return the dictionaryKeys
+   */
+  public byte[] getDictionaryKeys() {
+    return dictionaryKeys;
+  }
+
+  /**
+   * @return the noDictionaryKeys
+   */
+  public byte[] getNoDictionaryKeys() {
+    return noDictionaryKeys;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/MeasureDataWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/MeasureDataWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/MeasureDataWrapper.java
new file mode 100644
index 0000000..ff6ed34
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/MeasureDataWrapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+
+/**
+ * MeasureDataWrapper, interface.
+ */
+public interface MeasureDataWrapper {
+  CarbonReadDataHolder[] getValues();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
new file mode 100644
index 0000000..c4e5d15
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+
+public interface NodeMeasureDataStore {
+  /**
+   * This method will be used to get the writable key array.
+   * writable measure data array will hold below information:
+   * <size of measure data array><measure data array>
+   * total length will be 4 bytes for size + measure data array length
+   *
+   * @return writable array (compressed or normal)
+   */
+  byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] 
dataHolderArray);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
new file mode 100644
index 0000000..6c779a9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.datastore.block.AbstractIndex;
+import org.apache.carbondata.core.datastore.block.SegmentTaskIndex;
+import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
+
+/**
+ * Class to handle loading, unloading,clearing,storing of the table
+ * blocks
+ */
+public class SegmentTaskIndexStore
+    implements Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SegmentTaskIndexStore.class.getName());
+  /**
+   * carbon store path
+   */
+  protected String carbonStorePath;
+  /**
+   * CarbonLRU cache
+   */
+  protected CarbonLRUCache lruCache;
+
+  /**
+   * map of block info to lock object map, while loading the btree this will 
be filled
+   * and removed after loading the tree for that particular block info, this 
will be useful
+   * while loading the tree concurrently so only block level lock will be 
applied another
+   * block can be loaded concurrently
+   */
+  private Map<String, Object> segmentLockMap;
+
+  /**
+   * constructor to initialize the SegmentTaskIndexStore
+   *
+   * @param carbonStorePath
+   * @param lruCache
+   */
+  public SegmentTaskIndexStore(String carbonStorePath, CarbonLRUCache 
lruCache) {
+    this.carbonStorePath = carbonStorePath;
+    this.lruCache = lruCache;
+    segmentLockMap = new ConcurrentHashMap<String, Object>();
+  }
+
+  @Override
+  public SegmentTaskIndexWrapper get(TableSegmentUniqueIdentifier 
tableSegmentUniqueIdentifier)
+      throws IOException {
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    try {
+      segmentTaskIndexWrapper =
+          
loadAndGetTaskIdToSegmentsMap(tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
+              tableSegmentUniqueIdentifier.getAbsoluteTableIdentifier(),
+              tableSegmentUniqueIdentifier);
+    } catch (IndexBuilderException e) {
+      throw new IOException(e.getMessage(), e);
+    } catch (Throwable e) {
+      throw new IOException("Problem in loading segment block.", e);
+    }
+    return segmentTaskIndexWrapper;
+  }
+
+  @Override public List<SegmentTaskIndexWrapper> getAll(
+      List<TableSegmentUniqueIdentifier> tableSegmentUniqueIdentifiers) throws 
IOException {
+    List<SegmentTaskIndexWrapper> segmentTaskIndexWrappers =
+        new ArrayList<>(tableSegmentUniqueIdentifiers.size());
+    try {
+      for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : 
tableSegmentUniqueIdentifiers) {
+        segmentTaskIndexWrappers.add(get(segmentUniqueIdentifier));
+      }
+    } catch (Throwable e) {
+      for (SegmentTaskIndexWrapper segmentTaskIndexWrapper : 
segmentTaskIndexWrappers) {
+        segmentTaskIndexWrapper.clear();
+      }
+      throw new IOException("Problem in loading segment blocks.", e);
+    }
+    return segmentTaskIndexWrappers;
+  }
+
+  /**
+   * returns the SegmentTaskIndexWrapper
+   *
+   * @param tableSegmentUniqueIdentifier
+   * @return
+   */
+  @Override public SegmentTaskIndexWrapper getIfPresent(
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) {
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = 
(SegmentTaskIndexWrapper) lruCache
+        .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+    if (null != segmentTaskIndexWrapper) {
+      segmentTaskIndexWrapper.incrementAccessCount();
+    }
+    return segmentTaskIndexWrapper;
+  }
+
+  /**
+   * method invalidate the segment cache for segment
+   *
+   * @param tableSegmentUniqueIdentifier
+   */
+  @Override public void invalidate(TableSegmentUniqueIdentifier 
tableSegmentUniqueIdentifier) {
+    
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+  }
+
+  /**
+   * returns block timestamp value from the given task
+   * @param taskKey
+   * @param listOfUpdatedFactFiles
+   * @return
+   */
+  private String getTimeStampValueFromBlock(String taskKey, List<String> 
listOfUpdatedFactFiles) {
+    for (String blockName : listOfUpdatedFactFiles) {
+      if (taskKey.equals(CarbonTablePath.DataFileUtil.getTaskNo(blockName))) {
+        blockName = blockName.substring(blockName.lastIndexOf('-') + 1, 
blockName.lastIndexOf('.'));
+        return blockName;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Below method will be used to load the segment of segments
+   * One segment may have multiple task , so  table segment will be loaded
+   * based on task id and will return the map of taksId to table segment
+   * map
+   *
+   * @param segmentToTableBlocksInfos segment id to block info
+   * @param absoluteTableIdentifier   absolute table identifier
+   * @return map of taks id to segment mapping
+   * @throws IOException
+   */
+  private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap(
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos,
+      AbsoluteTableIdentifier absoluteTableIdentifier,
+      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) throws 
IOException {
+    // task id to segment map
+    Iterator<Map.Entry<String, List<TableBlockInfo>>> 
iteratorOverSegmentBlocksInfos =
+        segmentToTableBlocksInfos.entrySet().iterator();
+    Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null;
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+    String segmentId = null;
+    TaskBucketHolder taskBucketHolder = null;
+    try {
+      while (iteratorOverSegmentBlocksInfos.hasNext()) {
+        // segment id to table block mapping
+        Map.Entry<String, List<TableBlockInfo>> next = 
iteratorOverSegmentBlocksInfos.next();
+        // group task id to table block info mapping for the segment
+        Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
+            mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos);
+        segmentId = next.getKey();
+        // get the existing map of task id to table segment map
+        UpdateVO updateVO = 
updateStatusManager.getInvalidTimestampRange(segmentId);
+        // check if segment is already loaded, if segment is already loaded
+        //no need to load the segment block
+        String lruCacheKey = 
tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier();
+        segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) 
lruCache.get(lruCacheKey);
+        if (segmentTaskIndexWrapper == null || 
tableSegmentUniqueIdentifier.isSegmentUpdated()) {
+          // get the segment loader lock object this is to avoid
+          // same segment is getting loaded multiple times
+          // in case of concurrent query
+          Object segmentLoderLockObject = segmentLockMap.get(lruCacheKey);
+          if (null == segmentLoderLockObject) {
+            segmentLoderLockObject = addAndGetSegmentLock(lruCacheKey);
+          }
+          // acquire lock to lod the segment
+          synchronized (segmentLoderLockObject) {
+            segmentTaskIndexWrapper = (SegmentTaskIndexWrapper) 
lruCache.get(lruCacheKey);
+            if (null == segmentTaskIndexWrapper || tableSegmentUniqueIdentifier
+                .isSegmentUpdated()) {
+              // if the segment is updated then get the existing block task id 
map details
+              // so that the same can be updated after loading the btree.
+              if (tableSegmentUniqueIdentifier.isSegmentUpdated()
+                  && null != segmentTaskIndexWrapper) {
+                taskIdToSegmentIndexMap = 
segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+              } else {
+                // creating a map of take if to table segment
+                taskIdToSegmentIndexMap = new HashMap<TaskBucketHolder, 
AbstractIndex>();
+                segmentTaskIndexWrapper = new 
SegmentTaskIndexWrapper(taskIdToSegmentIndexMap);
+                segmentTaskIndexWrapper.incrementAccessCount();
+              }
+              Iterator<Map.Entry<TaskBucketHolder, List<TableBlockInfo>>> 
iterator =
+                  taskIdToTableBlockInfoMap.entrySet().iterator();
+              long requiredSize =
+                  calculateRequiredSize(taskIdToTableBlockInfoMap, 
absoluteTableIdentifier);
+              segmentTaskIndexWrapper
+                  .setMemorySize(requiredSize + 
segmentTaskIndexWrapper.getMemorySize());
+              boolean isAddedToLruCache =
+                  lruCache.put(lruCacheKey, segmentTaskIndexWrapper, 
requiredSize);
+              if (isAddedToLruCache) {
+                while (iterator.hasNext()) {
+                  Map.Entry<TaskBucketHolder, List<TableBlockInfo>> 
taskToBlockInfoList =
+                      iterator.next();
+                  taskBucketHolder = taskToBlockInfoList.getKey();
+                  taskIdToSegmentIndexMap.put(taskBucketHolder,
+                      loadBlocks(taskBucketHolder, 
taskToBlockInfoList.getValue(),
+                          absoluteTableIdentifier));
+                }
+              } else {
+                throw new IndexBuilderException(
+                    "Can not load the segment. No Enough space available.");
+              }
+
+              // set the latest timestamp.
+              segmentTaskIndexWrapper
+                  
.setRefreshedTimeStamp(updateVO.getCreatedOrUpdatedTimeStamp());
+              // tableSegmentMapTemp.put(next.getKey(), 
taskIdToSegmentIndexMap);
+              // removing from segment lock map as once segment is loaded
+              // if concurrent query is coming for same segment
+              // it will wait on the lock so after this segment will be already
+              // loaded so lock is not required, that is why removing the
+              // the lock object as it wont be useful
+              segmentLockMap.remove(lruCacheKey);
+            } else {
+              segmentTaskIndexWrapper.incrementAccessCount();
+            }
+          }
+        } else {
+          segmentTaskIndexWrapper.incrementAccessCount();
+        }
+      }
+    } catch (IndexBuilderException e) {
+      LOGGER.error("Problem while loading the segment");
+      throw e;
+    }
+    return segmentTaskIndexWrapper;
+  }
+
+  private long calculateRequiredSize(
+      Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    Iterator<Map.Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
+        taskIdToTableBlockInfoMap.entrySet().iterator();
+    TaskBucketHolder taskBucketHolder;
+    long driverBTreeSize = 0;
+    while (iterator.hasNext()) {
+      Map.Entry<TaskBucketHolder, List<TableBlockInfo>> taskToBlockInfoList = 
iterator.next();
+      taskBucketHolder = taskToBlockInfoList.getKey();
+      driverBTreeSize += CarbonUtil
+          .calculateDriverBTreeSize(taskBucketHolder.taskNo, 
taskBucketHolder.bucketNumber,
+              taskToBlockInfoList.getValue(), absoluteTableIdentifier);
+    }
+    return driverBTreeSize;
+  }
+
+  /**
+   * Below method will be used to get the task id to all the table block info 
belongs to
+   * that task id mapping
+   *
+   * @param segmentToTableBlocksInfos segment if to table blocks info map
+   * @return task id to table block info mapping
+   */
+  private Map<TaskBucketHolder, List<TableBlockInfo>> 
mappedAndGetTaskIdToTableBlockInfo(
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
+    Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
+        new ConcurrentHashMap<>();
+    Iterator<Entry<String, List<TableBlockInfo>>> iterator =
+        segmentToTableBlocksInfos.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<String, List<TableBlockInfo>> next = iterator.next();
+      List<TableBlockInfo> value = next.getValue();
+      for (TableBlockInfo blockInfo : value) {
+        String taskNo = DataFileUtil.getTaskNo(blockInfo.getFilePath());
+        String bucketNo = DataFileUtil.getBucketNo(blockInfo.getFilePath());
+        TaskBucketHolder bucketHolder = new TaskBucketHolder(taskNo, bucketNo);
+        List<TableBlockInfo> list = 
taskIdToTableBlockInfoMap.get(bucketHolder);
+        if (null == list) {
+          list = new ArrayList<TableBlockInfo>();
+          taskIdToTableBlockInfoMap.put(bucketHolder, list);
+        }
+        list.add(blockInfo);
+      }
+
+    }
+    return taskIdToTableBlockInfoMap;
+  }
+
+  /**
+   * Below method will be used to get the segment level lock object
+   *
+   * @param segmentId
+   * @return lock object
+   */
+  private synchronized Object addAndGetSegmentLock(String segmentId) {
+    // get the segment lock object if it is present then return
+    // otherwise add the new lock and return
+    Object segmentLoderLockObject = segmentLockMap.get(segmentId);
+    if (null == segmentLoderLockObject) {
+      segmentLoderLockObject = new Object();
+      segmentLockMap.put(segmentId, segmentLoderLockObject);
+    }
+    return segmentLoderLockObject;
+  }
+
+  /**
+   * Below method will be used to load the blocks
+   *
+   * @param tableBlockInfoList
+   * @return loaded segment
+   * @throws IOException
+   */
+  private AbstractIndex loadBlocks(TaskBucketHolder taskBucketHolder,
+      List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier 
tableIdentifier)
+      throws IOException {
+    // all the block of one task id will be loaded together
+    // so creating a list which will have all the data file meta data to of 
one task
+    List<DataFileFooter> footerList = CarbonUtil
+        .readCarbonIndexFile(taskBucketHolder.taskNo, 
taskBucketHolder.bucketNumber,
+            tableBlockInfoList, tableIdentifier);
+    AbstractIndex segment = new SegmentTaskIndex();
+    // file path of only first block is passed as it all table block info path 
of
+    // same task id will be same
+    segment.buildIndex(footerList);
+    return segment;
+  }
+
+  /**
+   * The method clears the access count of table segments
+   *
+   * @param tableSegmentUniqueIdentifiers
+   */
+  @Override
+  public void clearAccessCount(List<TableSegmentUniqueIdentifier> 
tableSegmentUniqueIdentifiers) {
+    for (TableSegmentUniqueIdentifier segmentUniqueIdentifier : 
tableSegmentUniqueIdentifiers) {
+      SegmentTaskIndexWrapper cacheable = (SegmentTaskIndexWrapper) lruCache
+          .get(segmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
+      cacheable.clear();
+    }
+  }
+
+  public static class TaskBucketHolder implements Serializable {
+
+    public String taskNo;
+
+    public String bucketNumber;
+
+    public TaskBucketHolder(String taskNo, String bucketNumber) {
+      this.taskNo = taskNo;
+      this.bucketNumber = bucketNumber;
+    }
+
+    @Override public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      TaskBucketHolder that = (TaskBucketHolder) o;
+
+      if (taskNo != null ? !taskNo.equals(that.taskNo) : that.taskNo != null) 
return false;
+      return bucketNumber != null ?
+          bucketNumber.equals(that.bucketNumber) :
+          that.bucketNumber == null;
+
+    }
+
+    @Override public int hashCode() {
+      int result = taskNo != null ? taskNo.hashCode() : 0;
+      result = 31 * result + (bucketNumber != null ? bucketNumber.hashCode() : 
0);
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/TableSegmentUniqueIdentifier.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/TableSegmentUniqueIdentifier.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/TableSegmentUniqueIdentifier.java
new file mode 100644
index 0000000..30407b4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/TableSegmentUniqueIdentifier.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+
+/**
+ * Class holds the absoluteTableIdentifier and segmentId to uniquely identify 
a segment
+ */
+public class TableSegmentUniqueIdentifier {
+  /**
+   * table fully qualified identifier
+   */
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  /**
+   * segment to tableBlockInfo map
+   */
+  Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos;
+
+  private String segmentId;
+  private  boolean isSegmentUpdated;
+
+  /**
+   * Constructor to initialize the class instance
+   * @param absoluteTableIdentifier
+   * @param segmentId
+   */
+  public TableSegmentUniqueIdentifier(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+      String segmentId) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    this.segmentId = segmentId;
+  }
+
+  public TableSegmentUniqueIdentifier(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos, String 
segmentId) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    this.segmentToTableBlocksInfos = segmentToTableBlocksInfos;
+    this.segmentId = segmentId;
+  }
+
+  /**
+   * returns AbsoluteTableIdentifier
+   * @return
+   */
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+    return absoluteTableIdentifier;
+  }
+
+  public void setAbsoluteTableIdentifier(AbsoluteTableIdentifier 
absoluteTableIdentifier) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+  }
+
+  /**
+   *  returns the segment to tableBlockInfo map
+   * @return
+   */
+  public Map<String, List<TableBlockInfo>> getSegmentToTableBlocksInfos() {
+    return segmentToTableBlocksInfos;
+  }
+
+  /**
+   * set the segment to tableBlockInfo map
+   * @param segmentToTableBlocksInfos
+   */
+  public void setSegmentToTableBlocksInfos(
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
+    this.segmentToTableBlocksInfos = segmentToTableBlocksInfos;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  /**
+   * method returns the id to uniquely identify a key
+   *
+   * @return
+   */
+  public String getUniqueTableSegmentIdentifier() {
+    CarbonTableIdentifier carbonTableIdentifier =
+            absoluteTableIdentifier.getCarbonTableIdentifier();
+    return carbonTableIdentifier.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR
+            + carbonTableIdentifier.getTableName() + 
CarbonCommonConstants.UNDERSCORE
+            + carbonTableIdentifier.getTableId() + 
CarbonCommonConstants.FILE_SEPARATOR + segmentId;
+  }
+  public void setIsSegmentUpdated(boolean isSegmentUpdated) {
+    this.isSegmentUpdated=isSegmentUpdated;
+  }
+
+  public boolean isSegmentUpdated() {
+    return isSegmentUpdated;
+  }
+
+  /**
+   * equals method to compare two objects having same
+   * absoluteIdentifier and segmentId
+   * @param o
+   * @return
+   */
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TableSegmentUniqueIdentifier that = (TableSegmentUniqueIdentifier) o;
+
+    if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) return 
false;
+    return segmentId.equals(that.segmentId);
+
+  }
+
+  /**
+   * Returns hashcode for the TableSegmentIdentifier
+   * @return
+   */
+  @Override public int hashCode() {
+    int result = absoluteTableIdentifier.hashCode();
+    result = 31 * result + segmentId.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
new file mode 100644
index 0000000..601df64
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/AbstractIndex.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+
+public abstract class AbstractIndex implements Cacheable {
+
+  /**
+   * vo class which will hold the RS information of the block
+   */
+  protected SegmentProperties segmentProperties;
+
+  /**
+   * data block
+   */
+  protected DataRefNode dataRefNode;
+
+  /**
+   * total number of row present in the block
+   */
+  protected long totalNumberOfRows;
+
+  /**
+   * atomic integer to maintain the access count for a column access
+   */
+  protected AtomicInteger accessCount = new AtomicInteger();
+
+  /**
+   * Table block meta size.
+   */
+  protected long memorySize;
+
+  /**
+   * @return the segmentProperties
+   */
+  public SegmentProperties getSegmentProperties() {
+    return segmentProperties;
+  }
+
+  /**
+   * @return the dataBlock
+   */
+  public DataRefNode getDataRefNode() {
+    return dataRefNode;
+  }
+
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
+  /**
+   * Below method will be used to load the data block
+   *
+   * @param footerList footer list
+   */
+  public abstract void buildIndex(List<DataFileFooter> footerList);
+
+  /**
+   * the method returns the access count
+   *
+   * @return
+   */
+  @Override public int getAccessCount() {
+    return accessCount.get();
+  }
+
+  /**
+   * The method returns table block size
+   *
+   * @return
+   */
+  @Override public long getMemorySize() {
+    return this.memorySize;
+  }
+
+  /**
+   * The method is used to set the access count
+   */
+  public void incrementAccessCount() {
+    accessCount.incrementAndGet();
+  }
+
+  /**
+   * This method will release the objects and set default value for primitive 
types
+   */
+  public void clear() {
+    decrementAccessCount();
+  }
+
+  /**
+   * This method will decrement the access count for a column by 1
+   * whenever a column usage is complete
+   */
+  private void decrementAccessCount() {
+    if (accessCount.get() > 0) {
+      accessCount.decrementAndGet();
+    }
+  }
+
+  /**
+   * the method is used to set the memory size of the b-tree
+   * @param memorySize
+   */
+  public void setMemorySize(long memorySize) {
+    this.memorySize = memorySize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockIndex.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockIndex.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockIndex.java
new file mode 100644
index 0000000..91019af
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockIndex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.BTreeBuilderInfo;
+import org.apache.carbondata.core.datastore.BtreeBuilder;
+import org.apache.carbondata.core.datastore.impl.btree.BlockletBTreeBuilder;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+
+/**
+ * Class which is responsible for loading the b+ tree block. This class will
+ * persist all the detail of a table block
+ */
+public class BlockIndex extends AbstractIndex {
+
+  /**
+   * Below method will be used to load the data block
+   *
+   */
+  public void buildIndex(List<DataFileFooter> footerList) {
+    // create a metadata details
+    // this will be useful in query handling
+    segmentProperties = new 
SegmentProperties(footerList.get(0).getColumnInTable(),
+        footerList.get(0).getSegmentInfo().getColumnCardinality());
+    // create a segment builder info
+    BTreeBuilderInfo indexBuilderInfo =
+        new BTreeBuilderInfo(footerList, 
segmentProperties.getDimensionColumnsValueSize());
+    BtreeBuilder blocksBuilder = new BlockletBTreeBuilder();
+    // load the metadata
+    blocksBuilder.build(indexBuilderInfo);
+    dataRefNode = blocksBuilder.get();
+    totalNumberOfRows = footerList.get(0).getNumberOfRows();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockInfo.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockInfo.java
new file mode 100644
index 0000000..97604e7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockInfo.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Below class will be used to store table block info
+ * As in blocklet distribution we are dividing the same block
+ * in parts but in case of block loading blocklets belongs to same
+ * block will be loaded together. This class will be used to store table block 
info
+ * and equals and hash code method is used to identify blocklet belongs to 
same block
+ */
+public class BlockInfo {
+
+  /**
+   * table block info, stores all the details
+   * about the block
+   */
+  private TableBlockInfo info;
+  /**
+   * unique blockName
+   */
+  private String blockUniqueName;
+
+  /**
+   * Constructor
+   *
+   * @param info
+   */
+  public BlockInfo(TableBlockInfo info) {
+    this.info = info;
+    init();
+  }
+
+  /**
+   * init the block unique name
+   */
+  private void init() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append(this.info.getSegmentId());
+    stringBuilder.append(CarbonCommonConstants.FILE_SEPARATOR);
+    
stringBuilder.append(CarbonTablePath.getCarbonDataFileName(this.info.getFilePath()));
+    this.blockUniqueName = stringBuilder.toString();
+  }
+
+  /**
+   * @return table Block info
+   */
+  public TableBlockInfo getTableBlockInfo() {
+    return info;
+  }
+
+  /**
+   * To set the table block info
+   *
+   * @param info
+   */
+  public void setTableBlockInfo(TableBlockInfo info) {
+    this.info = info;
+  }
+
+  /**
+   * method to get the hash code
+   */
+  @Override public int hashCode() {
+    int result = info.getFilePath().hashCode();
+    result = 31 * result + (int) (info.getBlockOffset() ^ 
(info.getBlockOffset() >>> 32));
+    result = 31 * result + (int) (info.getBlockLength() ^ 
(info.getBlockLength() >>> 32));
+    result = 31 * result + info.getSegmentId().hashCode();
+    return result;
+  }
+
+  /**
+   * To check the equality
+   *
+   * @param obj
+   */
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof BlockInfo)) {
+      return false;
+    }
+    BlockInfo other = (BlockInfo) obj;
+    if (!info.getSegmentId().equals(other.info.getSegmentId())) {
+      return false;
+    }
+    if (info.getBlockOffset() != other.info.getBlockOffset()) {
+      return false;
+    }
+    if (info.getBlockLength() != other.info.getBlockLength()) {
+      return false;
+    }
+
+    if (info.getFilePath() == null && other.info.getFilePath() != null) {
+      return false;
+    } else if (info.getFilePath() != null && other.info.getFilePath() == null) 
{
+      return false;
+    } else if (!info.getFilePath().equals(other.info.getFilePath())) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * returns unique blockname
+   * @return
+   */
+  public String getBlockUniqueName() {
+    return blockUniqueName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockletInfos.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockletInfos.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockletInfos.java
new file mode 100644
index 0000000..1b17fd6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/BlockletInfos.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import java.io.Serializable;
+
+/**
+ * The class holds the blocks blocklets info
+ */
+public class BlockletInfos implements Serializable {
+  /**
+   * no of blockLets
+   */
+  private int noOfBlockLets = 0;
+
+  /**
+   * start blocklet number
+   */
+  private int startBlockletNumber;
+  /**
+   * end blocklet number
+   */
+  private int numberOfBlockletToScan;
+  /**
+   * default constructor
+   */
+  public BlockletInfos(){
+  }
+  /**
+   * constructor to initialize the blockletinfo
+   * @param noOfBlockLets
+   * @param startBlockletNumber
+   * @param numberOfBlockletToScan
+   */
+  public BlockletInfos(int noOfBlockLets, int startBlockletNumber, int 
numberOfBlockletToScan) {
+    this.noOfBlockLets = noOfBlockLets;
+    this.startBlockletNumber = startBlockletNumber;
+    this.numberOfBlockletToScan = numberOfBlockletToScan;
+  }
+
+  /**
+   * returns the number of blockLets
+   *
+   * @return
+   */
+  public int getNoOfBlockLets() {
+    return noOfBlockLets;
+  }
+
+  /**
+   * sets the number of blockLets
+   *
+   * @param noOfBlockLets
+   */
+  public void setNoOfBlockLets(int noOfBlockLets) {
+    this.noOfBlockLets = noOfBlockLets;
+  }
+
+  /**
+   * returns start blocklet number
+   *
+   * @return
+   */
+  public int getStartBlockletNumber() {
+    return startBlockletNumber;
+  }
+
+  /**
+   * set start blocklet number
+   *
+   * @param startBlockletNumber
+   */
+  public void setStartBlockletNumber(int startBlockletNumber) {
+    this.startBlockletNumber = startBlockletNumber;
+  }
+
+  /**
+   * returns end blocklet number
+   *
+   * @return
+   */
+  public int getNumberOfBlockletToScan() {
+    return numberOfBlockletToScan;
+  }
+
+  /**
+   * set end blocklet number to be scaned
+   *
+   * @param numberOfBlockletToScan
+   */
+  public void setNumberOfBlockletToScan(int numberOfBlockletToScan) {
+    this.numberOfBlockletToScan = numberOfBlockletToScan;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/Distributable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/Distributable.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/Distributable.java
new file mode 100644
index 0000000..aeef72e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/Distributable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import java.io.IOException;
+
+/**
+ * interface to get the locations of node. Used for making task distribution 
based on locality
+ */
+public interface Distributable extends Comparable<Distributable> {
+
+  String[] getLocations() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
new file mode 100644
index 0000000..7496b15
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
+import 
org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthVariableSplitGenerator;
+import 
org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+/**
+ * This class contains all the details about the restructuring information of
+ * the block. This will be used during query execution to handle restructure
+ * information
+ */
+public class SegmentProperties {
+
+  /**
+   * key generator of the block which was used to generate the mdkey for
+   * normal dimension. this will be required to
+   */
+  private KeyGenerator dimensionKeyGenerator;
+
+  /**
+   * list of dimension present in the block
+   */
+  private List<CarbonDimension> dimensions;
+
+  /**
+   * list of dimension present in the block
+   */
+  private List<CarbonDimension> complexDimensions;
+
+  /**
+   * list of measure present in the block
+   */
+  private List<CarbonMeasure> measures;
+
+  /**
+   * cardinality of dimension columns participated in key generator
+   */
+  private int[] dimColumnsCardinality;
+
+  /**
+   * cardinality of complex dimension
+   */
+  private int[] complexDimColumnCardinality;
+
+  /**
+   * mapping of dimension column to block in a file this will be used for
+   * reading the blocks from file
+   */
+  private Map<Integer, Integer> dimensionOrdinalToBlockMapping;
+
+  /**
+   * a block can have multiple columns. This will have block index as key
+   * and all dimension participated in that block as values
+   */
+  private Map<Integer, Set<Integer>> blockTodimensionOrdinalMapping;
+
+  /**
+   * mapping of measure column to block to in file this will be used while
+   * reading the block in a file
+   */
+  private Map<Integer, Integer> measuresOrdinalToBlockMapping;
+
+  /**
+   * size of the each dimension column value in a block this can be used when
+   * we need to do copy a cell value to create a tuple.for no dictionary
+   * column this value will be -1. for dictionary column we size of the value
+   * will be fixed.
+   */
+  private int[] eachDimColumnValueSize;
+
+  /**
+   * size of the each dimension column value in a block this can be used when
+   * we need to do copy a cell value to create a tuple.for no dictionary
+   * column this value will be -1. for dictionary column we size of the value
+   * will be fixed.
+   */
+  private int[] eachComplexDimColumnValueSize;
+
+  /**
+   * below mapping will have mapping of the column group to dimensions ordinal
+   * for example if 3 dimension present in the columngroupid 0 and its ordinal 
in
+   * 2,3,4 then map will contain 0,{2,3,4}
+   */
+  private Map<Integer, KeyGenerator> columnGroupAndItsKeygenartor;
+
+  /**
+   * column group key generator dimension index will not be same as dimension 
ordinal
+   * This will have mapping with ordinal and keygenerator or mdkey index
+   */
+  private Map<Integer, Map<Integer, Integer>> columnGroupOrdinalToMdkeymapping;
+
+  /**
+   * this will be used to split the fixed length key
+   * this will all the information about how key was created
+   * and how to split the key based on group
+   */
+  private ColumnarSplitter fixedLengthKeySplitter;
+
+  /**
+   * to store the number of no dictionary dimension
+   * this will be used during query execution for creating
+   * start and end key. Purpose of storing this value here is
+   * so during query execution no need to calculate every time
+   */
+  private int numberOfNoDictionaryDimension;
+
+  /**
+   * column group model
+   */
+  private ColumnGroupModel colGroupModel;
+
+  public SegmentProperties(List<ColumnSchema> columnsInTable, int[] 
columnCardinality) {
+    dimensions = new 
ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    complexDimensions =
+        new 
ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    measures = new 
ArrayList<CarbonMeasure>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    fillDimensionAndMeasureDetails(columnsInTable, columnCardinality);
+    dimensionOrdinalToBlockMapping =
+        new HashMap<Integer, 
Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    blockTodimensionOrdinalMapping =
+        new HashMap<Integer, 
Set<Integer>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    measuresOrdinalToBlockMapping =
+        new HashMap<Integer, 
Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    intialiseColGroups();
+    fillOrdinalToBlockMappingForDimension();
+    fillOrdinalToBlockIndexMappingForMeasureColumns();
+    fillColumnGroupAndItsCardinality(columnCardinality);
+    fillKeyGeneratorDetails();
+  }
+
+  /**
+   * it fills column groups
+   * e.g {{1},{2,3,4},{5},{6},{7,8,9}}
+   *
+   */
+  private void intialiseColGroups() {
+    // StringBuffer columnGroups = new StringBuffer();
+    List<List<Integer>> colGrpList = new ArrayList<List<Integer>>();
+    List<Integer> group = new ArrayList<Integer>();
+    for (int i = 0; i < dimensions.size(); i++) {
+      CarbonDimension dimension = dimensions.get(i);
+      if (!dimension.hasEncoding(Encoding.DICTIONARY)) {
+        continue;
+      }
+      group.add(dimension.getOrdinal());
+      // columnGroups.append(dimension.getOrdinal());
+      if (i < dimensions.size() - 1) {
+        int currGroupOrdinal = dimension.columnGroupId();
+        int nextGroupOrdinal = dimensions.get(i + 1).columnGroupId();
+        if (!(currGroupOrdinal == nextGroupOrdinal && currGroupOrdinal != -1)) 
{
+          colGrpList.add(group);
+          group = new ArrayList<Integer>();
+        }
+      } else {
+        colGrpList.add(group);
+      }
+
+    }
+    int[][] colGroups = new int[colGrpList.size()][];
+    for (int i = 0; i < colGroups.length; i++) {
+      colGroups[i] = new int[colGrpList.get(i).size()];
+      for (int j = 0; j < colGroups[i].length; j++) {
+        colGroups[i][j] = colGrpList.get(i).get(j);
+      }
+    }
+    this.colGroupModel = CarbonUtil.getColGroupModel(colGroups);
+  }
+
+  /**
+   * below method is to fill the dimension and its mapping to file blocks all
+   * the column will point to same column group
+   */
+  private void fillOrdinalToBlockMappingForDimension() {
+    int blockOrdinal = -1;
+    CarbonDimension dimension = null;
+    int index = 0;
+    int prvcolumnGroupId = -1;
+    while (index < dimensions.size()) {
+      dimension = dimensions.get(index);
+      // if column id is same as previous one then block index will be
+      // same
+      if (dimension.isColumnar() || dimension.columnGroupId() != 
prvcolumnGroupId) {
+        blockOrdinal++;
+      }
+      dimensionOrdinalToBlockMapping.put(dimension.getOrdinal(), blockOrdinal);
+      prvcolumnGroupId = dimension.columnGroupId();
+      index++;
+    }
+    index = 0;
+    // complex dimension will be stored at last
+    while (index < complexDimensions.size()) {
+      dimension = complexDimensions.get(index);
+      dimensionOrdinalToBlockMapping.put(dimension.getOrdinal(), 
++blockOrdinal);
+      blockOrdinal = fillComplexDimensionChildBlockIndex(blockOrdinal, 
dimension);
+      index++;
+    }
+    fillBlockToDimensionOrdinalMapping();
+  }
+
+  /**
+   *
+   */
+  private void fillBlockToDimensionOrdinalMapping() {
+    Set<Entry<Integer, Integer>> blocks = 
dimensionOrdinalToBlockMapping.entrySet();
+    Iterator<Entry<Integer, Integer>> blockItr = blocks.iterator();
+    while (blockItr.hasNext()) {
+      Entry<Integer, Integer> block = blockItr.next();
+      Set<Integer> dimensionOrdinals = 
blockTodimensionOrdinalMapping.get(block.getValue());
+      if (dimensionOrdinals == null) {
+        dimensionOrdinals = new 
HashSet<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        blockTodimensionOrdinalMapping.put(block.getValue(), 
dimensionOrdinals);
+      }
+      dimensionOrdinals.add(block.getKey());
+    }
+  }
+
+  /**
+   * Below method will be used to add the complex dimension child
+   * block index.It is a recursive method which will be get the children
+   * add the block index
+   *
+   * @param blockOrdinal start block ordinal
+   * @param dimension    parent dimension
+   * @return last block index
+   */
+  private int fillComplexDimensionChildBlockIndex(int blockOrdinal, 
CarbonDimension dimension) {
+    for (int i = 0; i < dimension.numberOfChild(); i++) {
+      dimensionOrdinalToBlockMapping
+          .put(dimension.getListOfChildDimensions().get(i).getOrdinal(), 
++blockOrdinal);
+      if (dimension.getListOfChildDimensions().get(i).numberOfChild() > 0) {
+        blockOrdinal = fillComplexDimensionChildBlockIndex(blockOrdinal,
+            dimension.getListOfChildDimensions().get(i));
+      }
+    }
+    return blockOrdinal;
+  }
+
+  /**
+   * Below method will be used to fill the mapping
+   * of measure ordinal to its block index mapping in
+   * file
+   */
+  private void fillOrdinalToBlockIndexMappingForMeasureColumns() {
+    int blockOrdinal = 0;
+    int index = 0;
+    while (index < measures.size()) {
+      measuresOrdinalToBlockMapping.put(measures.get(index).getOrdinal(), 
blockOrdinal);
+      blockOrdinal++;
+      index++;
+    }
+  }
+
+  /**
+   * below method will fill dimension and measure detail of the block.
+   *
+   * @param columnsInTable
+   * @param columnCardinality
+   */
+  private void fillDimensionAndMeasureDetails(List<ColumnSchema> 
columnsInTable,
+      int[] columnCardinality) {
+    ColumnSchema columnSchema = null;
+    // ordinal will be required to read the data from file block
+    int dimensonOrdinal = 0;
+    int measureOrdinal = -1;
+    // table ordinal is actually a schema ordinal this is required as
+    // cardinality array
+    // which is stored in segment info contains -1 if that particular column
+    // is n
+    int tableOrdinal = -1;
+    // creating a list as we do not know how many dimension not participated
+    // in the mdkey
+    List<Integer> cardinalityIndexForNormalDimensionColumn =
+        new ArrayList<Integer>(columnsInTable.size());
+    // creating a list as we do not know how many dimension not participated
+    // in the mdkey
+    List<Integer> cardinalityIndexForComplexDimensionColumn =
+        new ArrayList<Integer>(columnsInTable.size());
+    boolean isComplexDimensionStarted = false;
+    CarbonDimension carbonDimension = null;
+    // to store the position of dimension in surrogate key array which is
+    // participating in mdkey
+    int keyOrdinal = 0;
+    int previousColumnGroup = -1;
+    // to store the ordinal of the column group ordinal
+    int columnGroupOrdinal = 0;
+    int counter = 0;
+    int complexTypeOrdinal = 0;
+    while (counter < columnsInTable.size()) {
+      columnSchema = columnsInTable.get(counter);
+      if (columnSchema.isDimensionColumn()) {
+        tableOrdinal++;
+        // not adding the cardinality of the non dictionary
+        // column as it was not the part of mdkey
+        if (CarbonUtil.hasEncoding(columnSchema.getEncodingList(), 
Encoding.DICTIONARY)
+            && !isComplexDimensionStarted && columnSchema.getNumberOfChild() 
== 0) {
+          cardinalityIndexForNormalDimensionColumn.add(tableOrdinal);
+          if (columnSchema.isColumnar()) {
+            // if it is a columnar dimension participated in mdkey then added
+            // key ordinal and dimension ordinal
+            carbonDimension =
+                new CarbonDimension(columnSchema, dimensonOrdinal++, 
keyOrdinal++, -1, -1);
+          } else {
+            // if not columnnar then it is a column group dimension
+
+            // below code to handle first dimension of the column group
+            // in this case ordinal of the column group will be 0
+            if (previousColumnGroup != columnSchema.getColumnGroupId()) {
+              columnGroupOrdinal = 0;
+              carbonDimension = new CarbonDimension(columnSchema, 
dimensonOrdinal++, keyOrdinal++,
+                  columnGroupOrdinal++, -1);
+            }
+            // if previous dimension  column group id is same as current then
+            // then its belongs to same row group
+            else {
+              carbonDimension = new CarbonDimension(columnSchema, 
dimensonOrdinal++, keyOrdinal++,
+                  columnGroupOrdinal++, -1);
+            }
+            previousColumnGroup = columnSchema.getColumnGroupId();
+          }
+        }
+        // as complex type will be stored at last so once complex type started 
all the dimension
+        // will be added to complex type
+        else if (isComplexDimensionStarted || 
CarbonUtil.hasDataType(columnSchema.getDataType(),
+            new DataType[] { DataType.ARRAY, DataType.STRUCT })) {
+          cardinalityIndexForComplexDimensionColumn.add(tableOrdinal);
+          carbonDimension =
+              new CarbonDimension(columnSchema, dimensonOrdinal++, -1, -1, 
complexTypeOrdinal++);
+          
carbonDimension.initializeChildDimensionsList(columnSchema.getNumberOfChild());
+          complexDimensions.add(carbonDimension);
+          isComplexDimensionStarted = true;
+          int previouseOrdinal = dimensonOrdinal;
+          dimensonOrdinal =
+              readAllComplexTypeChildrens(dimensonOrdinal, 
columnSchema.getNumberOfChild(),
+                  columnsInTable, carbonDimension, complexTypeOrdinal);
+          int numberOfChildrenDimensionAdded = dimensonOrdinal - 
previouseOrdinal;
+          for (int i = 0; i < numberOfChildrenDimensionAdded; i++) {
+            cardinalityIndexForComplexDimensionColumn.add(++tableOrdinal);
+          }
+          counter = dimensonOrdinal;
+          complexTypeOrdinal = carbonDimension.getListOfChildDimensions()
+              .get(carbonDimension.getListOfChildDimensions().size() - 
1).getComplexTypeOrdinal();
+          complexTypeOrdinal++;
+          continue;
+        } else {
+          // for no dictionary dimension
+          carbonDimension = new CarbonDimension(columnSchema, 
dimensonOrdinal++, -1, -1, -1);
+          numberOfNoDictionaryDimension++;
+        }
+        dimensions.add(carbonDimension);
+      } else {
+        measures.add(new CarbonMeasure(columnSchema, ++measureOrdinal));
+      }
+      counter++;
+    }
+    dimColumnsCardinality = new 
int[cardinalityIndexForNormalDimensionColumn.size()];
+    complexDimColumnCardinality = new 
int[cardinalityIndexForComplexDimensionColumn.size()];
+    int index = 0;
+    // filling the cardinality of the dimension column to create the key
+    // generator
+    for (Integer cardinalityArrayIndex : 
cardinalityIndexForNormalDimensionColumn) {
+      dimColumnsCardinality[index++] = 
columnCardinality[cardinalityArrayIndex];
+    }
+    index = 0;
+    // filling the cardinality of the complex dimension column to create the
+    // key generator
+    for (Integer cardinalityArrayIndex : 
cardinalityIndexForComplexDimensionColumn) {
+      complexDimColumnCardinality[index++] = 
columnCardinality[cardinalityArrayIndex];
+    }
+  }
+
+  /**
+   * Read all primitive/complex children and set it as list of child carbon 
dimension to parent
+   * dimension
+   *
+   * @param dimensionOrdinal
+   * @param childCount
+   * @param listOfColumns
+   * @param parentDimension
+   * @return
+   */
+  private int readAllComplexTypeChildrens(int dimensionOrdinal, int childCount,
+      List<ColumnSchema> listOfColumns, CarbonDimension parentDimension,
+      int complexDimensionOrdianl) {
+    for (int i = 0; i < childCount; i++) {
+      ColumnSchema columnSchema = listOfColumns.get(dimensionOrdinal);
+      if (columnSchema.isDimensionColumn()) {
+        if (columnSchema.getNumberOfChild() > 0) {
+          CarbonDimension complexDimension =
+              new CarbonDimension(columnSchema, dimensionOrdinal++, -1, -1,
+                  complexDimensionOrdianl++);
+          
complexDimension.initializeChildDimensionsList(columnSchema.getNumberOfChild());
+          parentDimension.getListOfChildDimensions().add(complexDimension);
+          dimensionOrdinal =
+              readAllComplexTypeChildrens(dimensionOrdinal, 
columnSchema.getNumberOfChild(),
+                  listOfColumns, complexDimension, complexDimensionOrdianl);
+        } else {
+          parentDimension.getListOfChildDimensions().add(
+              new CarbonDimension(columnSchema, dimensionOrdinal++, -1, -1,
+                  complexDimensionOrdianl++));
+        }
+      }
+    }
+    return dimensionOrdinal;
+  }
+
+  /**
+   * Below method will fill the key generator detail of both the type of key
+   * generator. This will be required for during both query execution and data
+   * loading.
+   */
+  private void fillKeyGeneratorDetails() {
+    // create a dimension partitioner list
+    // this list will contain information about how dimension value are
+    // stored
+    // it is stored in group or individually
+    List<Integer> dimensionPartitionList =
+        new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<Boolean> isDictionaryColumn =
+        new ArrayList<Boolean>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    int prvcolumnGroupId = -1;
+    int counter = 0;
+    while (counter < dimensions.size()) {
+      CarbonDimension carbonDimension = dimensions.get(counter);
+      // if dimension is not a part of mdkey then no need to add
+      if (!carbonDimension.getEncoder().contains(Encoding.DICTIONARY)) {
+        isDictionaryColumn.add(false);
+        counter++;
+        continue;
+      }
+      // columnar column is stored individually
+      // so add one
+      if (carbonDimension.isColumnar()) {
+        dimensionPartitionList.add(1);
+        isDictionaryColumn.add(true);
+      }
+      // if in a group then need to add how many columns a selected in
+      // group
+      if (!carbonDimension.isColumnar() && carbonDimension.columnGroupId() == 
prvcolumnGroupId) {
+        // incrementing the previous value of the list as it is in same column 
group
+        dimensionPartitionList.set(dimensionPartitionList.size() - 1,
+            dimensionPartitionList.get(dimensionPartitionList.size() - 1) + 1);
+      } else if (!carbonDimension.isColumnar()) {
+        dimensionPartitionList.add(1);
+        isDictionaryColumn.add(true);
+      }
+      prvcolumnGroupId = carbonDimension.columnGroupId();
+      counter++;
+    }
+    // get the partitioner
+    int[] dimensionPartitions = ArrayUtils
+        .toPrimitive(dimensionPartitionList.toArray(new 
Integer[dimensionPartitionList.size()]));
+    // get the bit length of each column
+    int[] bitLength = CarbonUtil.getDimensionBitLength(dimColumnsCardinality, 
dimensionPartitions);
+    // create a key generator
+    this.dimensionKeyGenerator = new MultiDimKeyVarLengthGenerator(bitLength);
+    this.fixedLengthKeySplitter =
+        new MultiDimKeyVarLengthVariableSplitGenerator(bitLength, 
dimensionPartitions);
+    // get the size of each value in file block
+    int[] dictionayDimColumnValueSize = 
fixedLengthKeySplitter.getBlockKeySize();
+    int index = -1;
+    this.eachDimColumnValueSize = new int[isDictionaryColumn.size()];
+    for (int i = 0; i < eachDimColumnValueSize.length; i++) {
+      if (!isDictionaryColumn.get(i)) {
+        eachDimColumnValueSize[i] = -1;
+        continue;
+      }
+      eachDimColumnValueSize[i] = dictionayDimColumnValueSize[++index];
+    }
+    if (complexDimensions.size() > 0) {
+      int[] complexDimesionParition = new 
int[complexDimColumnCardinality.length];
+      // as complex dimension will be stored in column format add one
+      Arrays.fill(complexDimesionParition, 1);
+      bitLength =
+          CarbonUtil.getDimensionBitLength(complexDimColumnCardinality, 
complexDimesionParition);
+      for (int i = 0; i < bitLength.length; i++) {
+        if (complexDimColumnCardinality[i] == 0) {
+          bitLength[i] = 64;
+        }
+      }
+      ColumnarSplitter keySplitter =
+          new MultiDimKeyVarLengthVariableSplitGenerator(bitLength, 
complexDimesionParition);
+      eachComplexDimColumnValueSize = keySplitter.getBlockKeySize();
+    } else {
+      eachComplexDimColumnValueSize = new int[0];
+    }
+  }
+
+  /**
+   * Below method will be used to create a mapping of column group and its 
column cardinality this
+   * mapping will have column group id to cardinality of the dimension present 
in
+   * the column group.This mapping will be used during query execution, to 
create
+   * a mask key for the column group dimension which will be used in 
aggregation
+   * and filter query as column group dimension will be stored at the bit level
+   */
+  private void fillColumnGroupAndItsCardinality(int[] cardinality) {
+    // mapping of the column group and its ordinal
+    Map<Integer, List<Integer>> columnGroupAndOrdinalMapping =
+        new HashMap<Integer, 
List<Integer>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // to store a column group
+    List<Integer> currentColumnGroup = null;
+    // current index
+    int index = 0;
+    // previous column group to check all the column of column id has bee 
selected
+    int prvColumnGroupId = -1;
+    while (index < dimensions.size()) {
+      // if dimension group id is not zero and it is same as the previous
+      // column id
+      // then we need to add ordinal of that column as it belongs to same
+      // column group
+      if (!dimensions.get(index).isColumnar()
+          && dimensions.get(index).columnGroupId() == prvColumnGroupId
+          && null != currentColumnGroup) {
+        currentColumnGroup.add(index);
+      }
+      // if column is not a columnar then new column group has come
+      // so we need to create a list of new column id group and add the
+      // ordinal
+      else if (!dimensions.get(index).isColumnar()) {
+        currentColumnGroup = new ArrayList<Integer>();
+        
columnGroupAndOrdinalMapping.put(dimensions.get(index).columnGroupId(), 
currentColumnGroup);
+        currentColumnGroup.add(index);
+      }
+      // update the column id every time,this is required to group the
+      // columns
+      // of the same column group
+      prvColumnGroupId = dimensions.get(index).columnGroupId();
+      index++;
+    }
+    // Initializing the map
+    this.columnGroupAndItsKeygenartor =
+        new HashMap<Integer, 
KeyGenerator>(columnGroupAndOrdinalMapping.size());
+    this.columnGroupOrdinalToMdkeymapping = new 
HashMap<>(columnGroupAndOrdinalMapping.size());
+    int[] columnGroupCardinality = null;
+    index = 0;
+    Iterator<Entry<Integer, List<Integer>>> iterator =
+        columnGroupAndOrdinalMapping.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<Integer, List<Integer>> next = iterator.next();
+      List<Integer> currentGroupOrdinal = next.getValue();
+      Map<Integer, Integer> colGrpOrdinalMdkeyMapping = new 
HashMap<>(currentGroupOrdinal.size());
+      // create the cardinality array
+      columnGroupCardinality = new int[currentGroupOrdinal.size()];
+      for (int i = 0; i < columnGroupCardinality.length; i++) {
+        // fill the cardinality
+        columnGroupCardinality[i] = cardinality[currentGroupOrdinal.get(i)];
+        colGrpOrdinalMdkeyMapping.put(currentGroupOrdinal.get(i), i);
+      }
+      this.columnGroupAndItsKeygenartor.put(next.getKey(), new 
MultiDimKeyVarLengthGenerator(
+          CarbonUtil.getDimensionBitLength(columnGroupCardinality,
+              new int[] { columnGroupCardinality.length })));
+      this.columnGroupOrdinalToMdkeymapping.put(next.getKey(), 
colGrpOrdinalMdkeyMapping);
+    }
+  }
+
+  /**
+   * Below method is to get the value of each dimension column. As this method
+   * will be used only once so we can merge both the dimension and complex
+   * dimension array. Complex dimension will be store at last so first copy
+   * the normal dimension the copy the complex dimension size. If we store
+   * this value as a class variable unnecessarily we will waste some space
+   *
+   * @return each dimension value size
+   */
+  public int[] getDimensionColumnsValueSize() {
+    int[] dimensionValueSize =
+        new int[eachDimColumnValueSize.length + 
eachComplexDimColumnValueSize.length];
+    System
+        .arraycopy(eachDimColumnValueSize, 0, dimensionValueSize, 0, 
eachDimColumnValueSize.length);
+    System.arraycopy(eachComplexDimColumnValueSize, 0, dimensionValueSize,
+        eachDimColumnValueSize.length, eachComplexDimColumnValueSize.length);
+    return dimensionValueSize;
+  }
+
+  /**
+   * @return the dimensionKeyGenerator
+   */
+  public KeyGenerator getDimensionKeyGenerator() {
+    return dimensionKeyGenerator;
+  }
+
+  /**
+   * @return the dimensions
+   */
+  public List<CarbonDimension> getDimensions() {
+    return dimensions;
+  }
+
+  /**
+   * @return the complexDimensions
+   */
+  public List<CarbonDimension> getComplexDimensions() {
+    return complexDimensions;
+  }
+
+  /**
+   * @return the measures
+   */
+  public List<CarbonMeasure> getMeasures() {
+    return measures;
+  }
+
+  /**
+   * @return the dimColumnsCardinality
+   */
+  public int[] getDimColumnsCardinality() {
+    return dimColumnsCardinality;
+  }
+
+  /**
+   * @return the complexDimColumnCardinality
+   */
+  public int[] getComplexDimColumnCardinality() {
+    return complexDimColumnCardinality;
+  }
+
+  /**
+   * @return the dimensionOrdinalToBlockMapping
+   */
+  public Map<Integer, Integer> getDimensionOrdinalToBlockMapping() {
+    return dimensionOrdinalToBlockMapping;
+  }
+
+  /**
+   * @return the measuresOrdinalToBlockMapping
+   */
+  public Map<Integer, Integer> getMeasuresOrdinalToBlockMapping() {
+    return measuresOrdinalToBlockMapping;
+  }
+
+  /**
+   * @return the eachDimColumnValueSize
+   */
+  public int[] getEachDimColumnValueSize() {
+    return eachDimColumnValueSize;
+  }
+
+  /**
+   * @return the eachComplexDimColumnValueSize
+   */
+  public int[] getEachComplexDimColumnValueSize() {
+    return eachComplexDimColumnValueSize;
+  }
+
+  /**
+   * @return the fixedLengthKeySplitter
+   */
+  public ColumnarSplitter getFixedLengthKeySplitter() {
+    return fixedLengthKeySplitter;
+  }
+
+  /**
+   * @return the columnGroupAndItsKeygenartor
+   */
+  public Map<Integer, KeyGenerator> getColumnGroupAndItsKeygenartor() {
+    return columnGroupAndItsKeygenartor;
+  }
+
+  /**
+   * @return the numberOfNoDictionaryDimension
+   */
+  public int getNumberOfNoDictionaryDimension() {
+    return numberOfNoDictionaryDimension;
+  }
+
+  /**
+   * @return
+   */
+  public int[][] getColumnGroups() {
+    return colGroupModel.getColumnGroup();
+  }
+
+  /**
+   * @return colGroupModel
+   */
+  public ColumnGroupModel getColumnGroupModel() {
+    return this.colGroupModel;
+  }
+
+  /**
+   * get mdkey ordinal for given dimension ordinal of given column group
+   *
+   * @param colGrpId
+   * @param ordinal
+   * @return mdkeyordinal
+   */
+  public int getColumnGroupMdKeyOrdinal(int colGrpId, int ordinal) {
+    return columnGroupOrdinalToMdkeymapping.get(colGrpId).get(ordinal);
+  }
+
+  /**
+   * It returns no of column availble in given column group
+   *
+   * @param colGrpId
+   * @return no of column in given column group
+   */
+  public int getNoOfColumnsInColumnGroup(int colGrpId) {
+    return columnGroupOrdinalToMdkeymapping.get(colGrpId).size();
+  }
+
+  /**
+   * @param blockIndex
+   * @return It returns all dimension present in given block index
+   */
+  public Set<Integer> getDimensionOrdinalForBlock(int blockIndex) {
+    return blockTodimensionOrdinalMapping.get(blockIndex);
+  }
+
+  /**
+   * @return It returns block index to dimension ordinal mapping
+   */
+  public Map<Integer, Set<Integer>> getBlockTodimensionOrdinalMapping() {
+    return blockTodimensionOrdinalMapping;
+  }
+
+}


Reply via email to