http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentTaskIndex.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentTaskIndex.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentTaskIndex.java
new file mode 100644
index 0000000..5fa6722
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentTaskIndex.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.BTreeBuilderInfo;
+import org.apache.carbondata.core.datastore.BtreeBuilder;
+import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeBuilder;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+
+/**
+ * Class which is responsible for loading the b+ tree block. This class will
+ * persist all the detail of a table segment
+ */
+public class SegmentTaskIndex extends AbstractIndex {
+
+  /**
+   * Below method is store the blocks in some data structure
+   *
+   */
+  public void buildIndex(List<DataFileFooter> footerList) {
+    // create a metadata details
+    // this will be useful in query handling
+    // all the data file metadata will have common segment properties we
+    // can use first one to get create the segment properties
+    segmentProperties = new 
SegmentProperties(footerList.get(0).getColumnInTable(),
+        footerList.get(0).getSegmentInfo().getColumnCardinality());
+    // create a segment builder info
+    // in case of segment create we do not need any file path and each column 
value size
+    // as Btree will be build as per min max and start key
+    BTreeBuilderInfo btreeBuilderInfo = new BTreeBuilderInfo(footerList, null);
+    BtreeBuilder blocksBuilder = new BlockBTreeBuilder();
+    // load the metadata
+    blocksBuilder.build(btreeBuilderInfo);
+    dataRefNode = blocksBuilder.get();
+    for (DataFileFooter footer : footerList) {
+      totalNumberOfRows += footer.getNumberOfRows();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentTaskIndexWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentTaskIndexWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentTaskIndexWrapper.java
new file mode 100644
index 0000000..1467a99
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentTaskIndexWrapper.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.block;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.core.cache.Cacheable;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
+import org.apache.carbondata.core.mutate.UpdateVO;
+
+/**
+ * SegmentTaskIndexWrapper class holds the  taskIdToTableSegmentMap
+ */
+public class SegmentTaskIndexWrapper implements Cacheable {
+
+  /**
+   * task_id to table segment index map
+   */
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> 
taskIdToTableSegmentMap;
+  /**
+   * atomic integer to maintain the access count for a column access
+   */
+  protected AtomicInteger accessCount = new AtomicInteger();
+
+  /**
+   * Table block meta size.
+   */
+  protected AtomicLong memorySize = new AtomicLong();
+
+  private Long refreshedTimeStamp;
+  private UpdateVO invalidTaskKey;
+  public SegmentTaskIndexWrapper(
+      Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> 
taskIdToTableSegmentMap) {
+    this.taskIdToTableSegmentMap = taskIdToTableSegmentMap;
+  }
+
+  public Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> 
getTaskIdToTableSegmentMap() {
+    return taskIdToTableSegmentMap;
+  }
+
+  public void setTaskIdToTableSegmentMap(
+      Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> 
taskIdToTableSegmentMap) {
+    this.taskIdToTableSegmentMap = taskIdToTableSegmentMap;
+  }
+
+  /**
+   * return segment size
+   *
+   * @param memorySize
+   */
+  public void setMemorySize(long memorySize) {
+    this.memorySize.set(memorySize);
+  }
+
+  /**
+   * returns the timestamp
+   *
+   * @return
+   */
+  @Override public long getFileTimeStamp() {
+    return 0;
+  }
+
+  /**
+   * returns the access count
+   *
+   * @return
+   */
+  @Override public int getAccessCount() {
+    return accessCount.get();
+  }
+
+  /**
+   * returns the memory size
+   *
+   * @return
+   */
+  @Override public long getMemorySize() {
+    return memorySize.get();
+  }
+
+  /**
+   * The method is used to set the access count
+   */
+  public void incrementAccessCount() {
+    accessCount.incrementAndGet();
+  }
+
+  /**
+   * This method will release the objects and set default value for primitive 
types
+   */
+  public void clear() {
+    decrementAccessCount();
+  }
+
+  /**
+   * This method will decrement the access count for a column by 1
+   * whenever a column usage is complete
+   */
+  private void decrementAccessCount() {
+    if (accessCount.get() > 0) {
+      accessCount.decrementAndGet();
+    }
+  }
+
+  public Long getRefreshedTimeStamp() {
+    return refreshedTimeStamp;
+  }
+
+  public void setRefreshedTimeStamp(Long refreshedTimeStamp) {
+    this.refreshedTimeStamp = refreshedTimeStamp;
+  }
+
+  public void removeEntryFromCacheAndRefresh(String taskId) {
+    AbstractIndex blockEntry = 
this.getTaskIdToTableSegmentMap().remove(taskId);
+    if (null != blockEntry) {
+      memorySize.set(memorySize.get() - blockEntry.getMemorySize());
+    }
+  }
+
+  public void setLastUpdateVO(UpdateVO invalidTaskKey) {
+    this.invalidTaskKey = invalidTaskKey;
+  }
+
+  public UpdateVO getInvalidTaskKey() {
+    return invalidTaskKey;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
new file mode 100644
index 0000000..8083d8e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
+
+/**
+ * class will be used to pass the block detail detail will be passed form 
driver
+ * to all the executor to load the b+ tree
+ */
+public class TableBlockInfo implements Distributable, Serializable {
+
+  /**
+   * serialization id
+   */
+  private static final long serialVersionUID = -6502868998599821172L;
+
+  /**
+   * full qualified file path of the block
+   */
+  private String filePath;
+
+  /**
+   * block offset in the file
+   */
+  private long blockOffset;
+
+  /**
+   * length of the block
+   */
+  private long blockLength;
+
+  /**
+   * id of the segment this will be used to sort the blocks
+   */
+  private String segmentId;
+
+  private String[] locations;
+
+  private ColumnarFormatVersion version;
+  /**
+   * The class holds the blockletsinfo
+   */
+  private BlockletInfos blockletInfos = new BlockletInfos();
+
+  /**
+   * map of block location and storage id
+   */
+  private Map<String, String> blockStorageIdMap =
+          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  public TableBlockInfo(String filePath, long blockOffset, String segmentId, 
String[] locations,
+      long blockLength, ColumnarFormatVersion version) {
+    this.filePath = FileFactory.getUpdatedFilePath(filePath);
+    this.blockOffset = blockOffset;
+    this.segmentId = segmentId;
+    this.locations = locations;
+    this.blockLength = blockLength;
+    this.version = version;
+  }
+
+  /**
+   * constructor to initialize the TbaleBlockInfo with BlockletInfos
+   *
+   * @param filePath
+   * @param blockOffset
+   * @param segmentId
+   * @param locations
+   * @param blockLength
+   * @param blockletInfos
+   */
+  public TableBlockInfo(String filePath, long blockOffset, String segmentId, 
String[] locations,
+      long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion 
version) {
+    this(filePath, blockOffset, segmentId, locations, blockLength, version);
+    this.blockletInfos = blockletInfos;
+  }
+
+  /**
+   * constructor to initialize the TableBlockInfo with blockStorageIdMap
+   *
+   * @param filePath
+   * @param blockOffset
+   * @param segmentId
+   * @param locations
+   * @param blockLength
+   * @param blockletInfos
+   * @param version
+   * @param blockStorageIdMap
+   */
+  public TableBlockInfo(String filePath, long blockOffset, String segmentId, 
String[] locations,
+      long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion 
version,
+      Map<String, String> blockStorageIdMap) {
+    this(filePath, blockOffset, segmentId, locations, blockLength, 
blockletInfos, version);
+    this.blockStorageIdMap = blockStorageIdMap;
+  }
+
+  /**
+   * @return the filePath
+   */
+  public String getFilePath() {
+    return filePath;
+  }
+
+  /**
+   * @return the blockOffset
+   */
+  public long getBlockOffset() {
+    return blockOffset;
+  }
+
+  public void setBlockOffset(long blockOffset) {
+    this.blockOffset = blockOffset;
+  }
+
+  /**
+   * @return the segmentId
+   */
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  /**
+   * @return the blockLength
+   */
+  public long getBlockLength() {
+    return blockLength;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see java.lang.Object#equals(java.lang.Object)
+   */
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof TableBlockInfo)) {
+      return false;
+    }
+    TableBlockInfo other = (TableBlockInfo) obj;
+    if (!segmentId.equals(other.segmentId)) {
+      return false;
+    }
+    if (blockOffset != other.blockOffset) {
+      return false;
+    }
+    if (blockLength != other.blockLength) {
+      return false;
+    }
+    if (filePath == null && other.filePath != null) {
+      return false;
+    } else if (filePath != null && other.filePath == null) {
+      return false;
+    } else if (!filePath.equals(other.filePath)) {
+      return false;
+    }
+    if (blockletInfos.getStartBlockletNumber() != 
other.blockletInfos.getStartBlockletNumber()) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Below method will used to compare to TableBlockInfos object this will
+   * used for sorting Comparison logic is: 1. compare segment id if segment id
+   * is same 2. compare task id if task id is same 3. compare offsets of the
+   * block
+   */
+  @Override public int compareTo(Distributable other) {
+
+    int compareResult = 0;
+    // get the segment id
+    // converr seg ID to double.
+
+    double seg1 = Double.parseDouble(segmentId);
+    double seg2 = Double.parseDouble(((TableBlockInfo) other).segmentId);
+    if (seg1 - seg2 < 0) {
+      return -1;
+    }
+    if (seg1 - seg2 > 0) {
+      return 1;
+    }
+
+    // Comparing the time task id of the file to other
+    // if both the task id of the file is same then we need to compare the
+    // offset of
+    // the file
+    if (CarbonTablePath.isCarbonDataFile(filePath)) {
+      int firstTaskId = Integer.parseInt(DataFileUtil.getTaskNo(filePath));
+      int otherTaskId = 
Integer.parseInt(DataFileUtil.getTaskNo(((TableBlockInfo) other).filePath));
+      if (firstTaskId != otherTaskId) {
+        return firstTaskId - otherTaskId;
+      }
+      // compare the part no of both block info
+      int firstPartNo = Integer.parseInt(DataFileUtil.getPartNo(filePath));
+      int SecondPartNo =
+          Integer.parseInt(DataFileUtil.getPartNo(((TableBlockInfo) 
other).filePath));
+      compareResult = firstPartNo - SecondPartNo;
+    } else {
+      compareResult = filePath.compareTo(((TableBlockInfo) 
other).getFilePath());
+    }
+    if (compareResult != 0) {
+      return compareResult;
+    }
+    //compare result is not 0 then return
+    // if part no is also same then compare the offset and length of the block
+    if (blockOffset + blockLength
+        < ((TableBlockInfo) other).blockOffset + ((TableBlockInfo) 
other).blockLength) {
+      return -1;
+    } else if (blockOffset + blockLength
+        > ((TableBlockInfo) other).blockOffset + ((TableBlockInfo) 
other).blockLength) {
+      return 1;
+    }
+    //compare the startBlockLetNumber
+    int diffStartBlockLetNumber =
+        blockletInfos.getStartBlockletNumber() - ((TableBlockInfo) 
other).blockletInfos
+            .getStartBlockletNumber();
+    if (diffStartBlockLetNumber < 0) {
+      return -1;
+    }
+    if (diffStartBlockLetNumber > 0) {
+      return 1;
+    }
+    return 0;
+  }
+
+  @Override public int hashCode() {
+    int result = filePath.hashCode();
+    result = 31 * result + (int) (blockOffset ^ (blockOffset >>> 32));
+    result = 31 * result + (int) (blockLength ^ (blockLength >>> 32));
+    result = 31 * result + segmentId.hashCode();
+    result = 31 * result + blockletInfos.getStartBlockletNumber();
+    return result;
+  }
+
+  @Override public String[] getLocations() {
+    return locations;
+  }
+
+  /**
+   * returns BlockletInfos
+   *
+   * @return
+   */
+  public BlockletInfos getBlockletInfos() {
+    return blockletInfos;
+  }
+
+  /**
+   * set the blocklestinfos
+   *
+   * @param blockletInfos
+   */
+  public void setBlockletInfos(BlockletInfos blockletInfos) {
+    this.blockletInfos = blockletInfos;
+  }
+
+  public ColumnarFormatVersion getVersion() {
+    return version;
+  }
+
+  public void setVersion(ColumnarFormatVersion version) {
+    this.version = version;
+  }
+
+  /**
+   * returns the storage location vs storage id map
+   *
+   * @return
+   */
+  public Map<String, String> getBlockStorageIdMap() {
+    return this.blockStorageIdMap;
+  }
+
+  /**
+   * method to storage location vs storage id map
+   *
+   * @param blockStorageIdMap
+   */
+  public void setBlockStorageIdMap(Map<String, String> blockStorageIdMap) {
+    this.blockStorageIdMap = blockStorageIdMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockUniqueIdentifier.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockUniqueIdentifier.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockUniqueIdentifier.java
new file mode 100644
index 0000000..4a05ff8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockUniqueIdentifier.java
@@ -0,0 +1,72 @@
+package org.apache.carbondata.core.datastore.block;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+
+/**
+ * Class : Holds the info to uniquely identify a blocks
+ */
+public class TableBlockUniqueIdentifier {
+
+  /**
+   * table fully qualified name
+   */
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  /**
+   * table block info
+   */
+  private TableBlockInfo tableBlockInfo;
+
+  public TableBlockUniqueIdentifier(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+      TableBlockInfo tableBlockInfo) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+    this.tableBlockInfo = tableBlockInfo;
+  }
+
+  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
+    return absoluteTableIdentifier;
+  }
+
+  public void setAbsoluteTableIdentifier(AbsoluteTableIdentifier 
absoluteTableIdentifier) {
+    this.absoluteTableIdentifier = absoluteTableIdentifier;
+  }
+
+  public TableBlockInfo getTableBlockInfo() {
+    return tableBlockInfo;
+  }
+
+  public void setTableBlockInfo(TableBlockInfo tableBlockInfo) {
+    this.tableBlockInfo = tableBlockInfo;
+  }
+
+  @Override public int hashCode() {
+    return this.absoluteTableIdentifier.hashCode() + 
this.tableBlockInfo.hashCode();
+  }
+
+  @Override public boolean equals(Object other) {
+    if (this == other) return true;
+    if (other == null || getClass() != other.getClass()) return false;
+    TableBlockUniqueIdentifier tableBlockUniqueIdentifier = 
(TableBlockUniqueIdentifier) other;
+    return 
this.absoluteTableIdentifier.equals(tableBlockUniqueIdentifier.absoluteTableIdentifier)
+        && 
this.tableBlockInfo.equals(tableBlockUniqueIdentifier.tableBlockInfo);
+  }
+
+  /**
+   * returns the String value to uniquely identify a block
+   *
+   * @return
+   */
+  public String getUniqueTableBlockName() {
+    BlockInfo blockInfo = new BlockInfo(this.tableBlockInfo);
+    CarbonTableIdentifier carbonTableIdentifier =
+        this.absoluteTableIdentifier.getCarbonTableIdentifier();
+    String uniqueTableBlockName = carbonTableIdentifier.getDatabaseName()
+        + CarbonCommonConstants.FILE_SEPARATOR + carbonTableIdentifier
+        .getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
+        + this.tableBlockInfo.getSegmentId()
+        + CarbonCommonConstants.FILE_SEPARATOR + blockInfo.hashCode();
+    return uniqueTableBlockName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/TableTaskInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableTaskInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableTaskInfo.java
new file mode 100644
index 0000000..e7b534a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableTaskInfo.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.block;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * This class is responsible for maintaining the mapping of tasks of a node.
+ */
+public class TableTaskInfo implements Distributable {
+
+  private final List<TableBlockInfo> tableBlockInfoList;
+  private final String taskId;
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public List<TableBlockInfo> getTableBlockInfoList() {
+    return tableBlockInfoList;
+  }
+
+  public TableTaskInfo(String taskId, List<TableBlockInfo> tableBlockInfoList){
+    this.taskId = taskId;
+    this.tableBlockInfoList = tableBlockInfoList;
+  }
+
+  @Override public String[] getLocations() {
+    Set<String> locations = new HashSet<String>();
+    for(TableBlockInfo tableBlockInfo: tableBlockInfoList){
+      locations.addAll(Arrays.asList(tableBlockInfo.getLocations()));
+    }
+    locations.toArray(new String[locations.size()]);
+    List<String> nodes =  TableTaskInfo.maxNoNodes(tableBlockInfoList);
+    return nodes.toArray(new String[nodes.size()]);
+  }
+
+  @Override public int compareTo(Distributable o) {
+    return taskId.compareTo(((TableTaskInfo)o).getTaskId());
+  }
+
+  /**
+   * Finding which node has the maximum number of blocks for it.
+   * @param blockList
+   * @return
+   */
+  public static List<String> maxNoNodes(List<TableBlockInfo> blockList) {
+    boolean useIndex = true;
+    Integer maxOccurence = 0;
+    String maxNode = null;
+    Map<String, Integer> nodeAndOccurenceMapping = new TreeMap<>();
+
+    // populate the map of node and number of occurences of that node.
+    for (TableBlockInfo block : blockList) {
+      for (String node : block.getLocations()) {
+        Integer nodeOccurence = nodeAndOccurenceMapping.get(node);
+        if (null == nodeOccurence) {
+          nodeAndOccurenceMapping.put(node, 1);
+        } else {
+          nodeOccurence++;
+        }
+      }
+    }
+    Integer previousValueOccurence = null;
+
+    // check which node is occured maximum times.
+    for (Map.Entry<String, Integer> entry : 
nodeAndOccurenceMapping.entrySet()) {
+      // finding the maximum node.
+      if (entry.getValue() > maxOccurence) {
+        maxOccurence = entry.getValue();
+        maxNode = entry.getKey();
+      }
+      // first time scenario. initialzing the previous value.
+      if (null == previousValueOccurence) {
+        previousValueOccurence = entry.getValue();
+      } else {
+        // for the case where all the nodes have same number of blocks then
+        // we need to return complete list instead of max node.
+        if (!Objects.equals(previousValueOccurence, entry.getValue())) {
+          useIndex = false;
+        }
+      }
+    }
+
+    // if all the nodes have equal occurence then returning the complete key 
set.
+    if (useIndex) {
+      return new ArrayList<>(nodeAndOccurenceMapping.keySet());
+    }
+
+    // if any max node is found then returning the max node.
+    List<String> node =  new ArrayList<>(1);
+    node.add(maxNode);
+    return node;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
new file mode 100644
index 0000000..b19409b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TaskBlockInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.block;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * This class contains blocks info of each task
+ */
+public class TaskBlockInfo {
+
+  // stores TableBlockInfo list of each task
+  private Map<String, List<TableBlockInfo>> taskBlockInfoMapping;
+
+  public TaskBlockInfo(){
+
+    taskBlockInfoMapping = new 
HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  }
+
+  /**
+   * returns task set
+   * @return
+   */
+  public Set<String> getTaskSet() {
+    return taskBlockInfoMapping.keySet();
+  }
+
+
+  /**
+   * returns TableBlockInfoList of given task
+   * @return
+   */
+  public List<TableBlockInfo> getTableBlockInfoList(String task) {
+    return taskBlockInfoMapping.get(task);
+  }
+
+  /**
+   *  maps TableBlockInfoList to respective task
+   * @param task
+   * @param tableBlockInfoList
+   */
+  public void addTableBlockInfoList(String task, List<TableBlockInfo> 
tableBlockInfoList) {
+    taskBlockInfoMapping.put(task, tableBlockInfoList);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnDataChunk.java
new file mode 100644
index 0000000..086785a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/DimensionColumnDataChunk.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk;
+
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
+/**
+ * Interface for dimension column chunk.
+ */
+public interface DimensionColumnDataChunk {
+
+  /**
+   * Below method will be used to fill the data based on offset and row id
+   *
+   * @param data   data to filed
+   * @param offset offset from which data need to be filed
+   * @return how many bytes was copied
+   */
+  int fillChunkData(byte[] data, int offset, int columnIndex, KeyStructureInfo 
restructuringInfo);
+
+  /**
+   * It uses to convert column data to dictionary integer value
+   *
+   * @param rowId
+   * @param columnIndex
+   * @param row
+   * @param restructuringInfo @return
+   */
+  int fillConvertedChunkData(int rowId, int columnIndex, int[] row,
+      KeyStructureInfo restructuringInfo);
+
+  /**
+   * Fill the data to vector
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo);
+
+  /**
+   * Fill the data to vector
+   * @param rowMapping
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] vectorInfo, 
int column,
+      KeyStructureInfo restructuringInfo);
+
+  /**
+   * Below method to get  the data based in row id
+   *
+   * @return chunk
+   */
+  byte[] getChunkData(int columnIndex);
+
+  /**
+   * @return inverted index
+   */
+  int getInvertedIndex(int index);
+
+  /**
+   * @return whether column is dictionary column or not
+   */
+  boolean isNoDicitionaryColumn();
+
+  /**
+   * @return length of each column
+   */
+  int getColumnValueSize();
+
+  /**
+   * @return whether columns where explictly sorted or not
+   */
+  boolean isExplicitSorted();
+
+  /**
+   * to compare the data
+   *
+   * @param index        row index to be compared
+   * @param compareValue value to compare
+   * @return compare result
+   */
+  int compareTo(int index, byte[] compareValue);
+
+  /**
+   * below method will be used to free the allocated memory
+   */
+  void freeMemory();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/MeasureColumnDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/MeasureColumnDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/MeasureColumnDataChunk.java
new file mode 100644
index 0000000..3c08f45
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/MeasureColumnDataChunk.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk;
+
+import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
+
+/**
+ * Holder for measure column chunk
+ * it will have data and its attributes which will
+ * be required for processing
+ */
+public class MeasureColumnDataChunk {
+
+  /**
+   * measure chunk
+   */
+  private CarbonReadDataHolder measureDataHolder;
+
+  /**
+   * below to hold null value holds this information
+   * about the null value index this will be helpful in case of
+   * to remove the null value while aggregation
+   */
+  private PresenceMeta nullValueIndexHolder;
+
+  /**
+   * @return the measureDataHolder
+   */
+  public CarbonReadDataHolder getMeasureDataHolder() {
+    return measureDataHolder;
+  }
+
+  /**
+   * @param measureDataHolder the measureDataHolder to set
+   */
+  public void setMeasureDataHolder(CarbonReadDataHolder measureDataHolder) {
+    this.measureDataHolder = measureDataHolder;
+  }
+
+  /**
+   * @return the nullValueIndexHolder
+   */
+  public PresenceMeta getNullValueIndexHolder() {
+    return nullValueIndexHolder;
+  }
+
+  /**
+   * @param nullValueIndexHolder the nullValueIndexHolder to set
+   */
+  public void setNullValueIndexHolder(PresenceMeta nullValueIndexHolder) {
+    this.nullValueIndexHolder = nullValueIndexHolder;
+  }
+
+  public void freeMemory() {
+    this.measureDataHolder.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionDataChunk.java
new file mode 100644
index 0000000..03b2fb6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionDataChunk.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.impl;
+
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
+
+/**
+ * Class responsibility is to give access to dimension column data chunk store
+ */
+public abstract class AbstractDimensionDataChunk implements 
DimensionColumnDataChunk {
+
+  /**
+   * data chunks
+   */
+  protected DimensionDataChunkStore dataChunkStore;
+
+  /**
+   * @return whether columns where explicitly sorted or not
+   */
+  @Override public boolean isExplicitSorted() {
+    return dataChunkStore.isExplicitSorted();
+  }
+
+  /**
+   * Below method to get the data based in row id
+   *
+   * @param index row id of the data
+   * @return chunk
+   */
+  @Override public byte[] getChunkData(int index) {
+    return dataChunkStore.getRow(index);
+  }
+
+  /**
+   * @return inverted index
+   */
+  @Override public int getInvertedIndex(int index) {
+    return dataChunkStore.getInvertedIndex(index);
+  }
+
+  /**
+   * @return length of each column
+   */
+  @Override public int getColumnValueSize() {
+    return dataChunkStore.getColumnValueSize();
+  }
+
+  /**
+   * To compare the data
+   *
+   * @param index        row index to be compared
+   * @param compareValue value to compare
+   * @return compare result
+   */
+  @Override public int compareTo(int index, byte[] compareValue) {
+    // TODO Auto-generated method stub
+    return dataChunkStore.compareTo(index, compareValue);
+  }
+
+  /**
+   * below method will be used to free the allocated memory
+   */
+  @Override public void freeMemory() {
+    dataChunkStore.freeMemory();
+  }
+
+  /**
+   * @return column is dictionary column or not
+   */
+  @Override public boolean isNoDicitionaryColumn() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
new file mode 100644
index 0000000..bad5679
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.impl;
+
+import 
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
+/**
+ * This class is gives access to column group dimension data chunk store
+ */
+public class ColumnGroupDimensionDataChunk extends AbstractDimensionDataChunk {
+
+  /**
+   * Constructor for this class
+   *
+   * @param dataChunk       data chunk
+   * @param chunkAttributes chunk attributes
+   */
+  public ColumnGroupDimensionDataChunk(byte[] dataChunk, int columnValueSize, 
int numberOfRows) {
+    this.dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+        .getDimensionChunkStore(columnValueSize, false, numberOfRows, 
dataChunk.length,
+        DimensionStoreType.FIXEDLENGTH);
+    this.dataChunkStore.putArray(null, null, dataChunk);
+  }
+
+  /**
+   * Below method will be used to fill the data based on offset and row id
+   *
+   * @param data              data to filed
+   * @param offset            offset from which data need to be filed
+   * @param rowId             row id of the chunk
+   * @param restructuringInfo define the structure of the key
+   * @return how many bytes was copied
+   */
+  @Override public int fillChunkData(byte[] data, int offset, int rowId,
+      KeyStructureInfo restructuringInfo) {
+    byte[] row = dataChunkStore.getRow(rowId);
+    byte[] maskedKey = getMaskedKey(row, restructuringInfo);
+    System.arraycopy(maskedKey, 0, data, offset, maskedKey.length);
+    return maskedKey.length;
+  }
+
+  /**
+   * Converts to column dictionary integer value
+   *
+   * @param rowId
+   * @param columnIndex
+   * @param row
+   * @param restructuringInfo @return
+   */
+  @Override public int fillConvertedChunkData(int rowId, int columnIndex, 
int[] row,
+      KeyStructureInfo info) {
+    byte[] data = dataChunkStore.getRow(rowId);
+    long[] keyArray = info.getKeyGenerator().getKeyArray(data);
+    int[] ordinal = info.getMdkeyQueryDimensionOrdinal();
+    for (int i = 0; i < ordinal.length; i++) {
+      row[columnIndex++] = (int) keyArray[ordinal[i]];
+    }
+    return columnIndex;
+  }
+
+  /**
+   * Below method will be used to get the masked key
+   *
+   * @param data   data
+   * @param offset offset of
+   * @param info
+   * @return
+   */
+  private byte[] getMaskedKey(byte[] data, KeyStructureInfo info) {
+    byte[] maskedKey = new byte[info.getMaskByteRanges().length];
+    int counter = 0;
+    int byteRange = 0;
+    for (int i = 0; i < info.getMaskByteRanges().length; i++) {
+      byteRange = info.getMaskByteRanges()[i];
+      maskedKey[counter++] = (byte) (data[byteRange] & 
info.getMaxKey()[byteRange]);
+    }
+    return maskedKey;
+  }
+
+  /**
+   * @return inverted index
+   */
+  @Override public int getInvertedIndex(int index) {
+    throw new UnsupportedOperationException("Operation not supported in case 
of cloumn group");
+  }
+
+  /**
+   * @return whether columns where explictly sorted or not
+   */
+  @Override public boolean isExplicitSorted() {
+    return false;
+  }
+
+  /**
+   * to compare the data
+   *
+   * @param index        row index to be compared
+   * @param compareValue value to compare
+   * @return compare result
+   */
+  @Override public int compareTo(int index, byte[] compareValue) {
+    throw new UnsupportedOperationException("Operation not supported in case 
of cloumn group");
+  }
+
+  /**
+   * Fill the data to vector
+   *
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, 
int column,
+      KeyStructureInfo restructuringInfo) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    int[] ordinal = restructuringInfo.getMdkeyQueryDimensionOrdinal();
+    for (int k = offset; k < len; k++) {
+      long[] keyArray = 
restructuringInfo.getKeyGenerator().getKeyArray(dataChunkStore.getRow(k));
+      int index = 0;
+      for (int i = column; i < column + ordinal.length; i++) {
+        if (vectorInfo[i].directDictionaryGenerator == null) {
+          vectorInfo[i].vector.putInt(vectorOffset, (int) 
keyArray[ordinal[index++]]);
+        } else {
+          vectorInfo[i].vector.putLong(vectorOffset, (long) 
vectorInfo[i].directDictionaryGenerator
+              .getValueFromSurrogate((int) keyArray[ordinal[index++]]));
+        }
+      }
+      vectorOffset++;
+    }
+    return column + ordinal.length;
+  }
+
+  /**
+   * Fill the data to vector
+   *
+   * @param rowMapping
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(int[] rowMapping, 
ColumnVectorInfo[] vectorInfo,
+      int column, KeyStructureInfo restructuringInfo) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    int[] ordinal = restructuringInfo.getMdkeyQueryDimensionOrdinal();
+    for (int k = offset; k < len; k++) {
+      long[] keyArray = 
restructuringInfo.getKeyGenerator().getKeyArray(dataChunkStore.getRow(k));
+      int index = 0;
+      for (int i = column; i < column + ordinal.length; i++) {
+        if (vectorInfo[i].directDictionaryGenerator == null) {
+          vectorInfo[i].vector.putInt(vectorOffset, (int) 
keyArray[ordinal[index++]]);
+        } else {
+          vectorInfo[i].vector.putLong(vectorOffset, (long) 
vectorInfo[i].directDictionaryGenerator
+              .getValueFromSurrogate((int) keyArray[ordinal[index++]]));
+        }
+      }
+      vectorOffset++;
+    }
+    return column + ordinal.length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
new file mode 100644
index 0000000..0730607
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.impl;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
+/**
+ * This class is gives access to fixed length dimension data chunk store
+ */
+public class FixedLengthDimensionDataChunk extends AbstractDimensionDataChunk {
+
+  /**
+   * Constructor
+   *
+   * @param dataChunk            data chunk
+   * @param invertedIndex        inverted index
+   * @param invertedIndexReverse reverse inverted index
+   * @param numberOfRows         number of rows
+   * @param columnValueSize      size of each column value
+   */
+  public FixedLengthDimensionDataChunk(byte[] dataChunk, int[] invertedIndex,
+      int[] invertedIndexReverse, int numberOfRows, int columnValueSize) {
+    long totalSize = null != invertedIndex ?
+        dataChunk.length + (2 * numberOfRows * 
CarbonCommonConstants.INT_SIZE_IN_BYTE) :
+        dataChunk.length;
+    dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+        .getDimensionChunkStore(columnValueSize, null != invertedIndex, 
numberOfRows, totalSize,
+            DimensionStoreType.FIXEDLENGTH);
+    dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
+  }
+
+  /**
+   * Below method will be used to fill the data based on offset and row id
+   *
+   * @param data             data to filed
+   * @param offset           offset from which data need to be filed
+   * @param index            row id of the chunk
+   * @param keyStructureInfo define the structure of the key
+   * @return how many bytes was copied
+   */
+  @Override public int fillChunkData(byte[] data, int offset, int index,
+      KeyStructureInfo keyStructureInfo) {
+    dataChunkStore.fillRow(index, data, offset);
+    return dataChunkStore.getColumnValueSize();
+  }
+
+  /**
+   * Converts to column dictionary integer value
+   *
+   * @param rowId
+   * @param columnIndex
+   * @param row
+   * @param restructuringInfo
+   * @return
+   */
+  @Override public int fillConvertedChunkData(int rowId, int columnIndex, 
int[] row,
+      KeyStructureInfo restructuringInfo) {
+    row[columnIndex] = dataChunkStore.getSurrogate(rowId);
+    return columnIndex + 1;
+  }
+
+  /**
+   * Fill the data to vector
+   *
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, 
int column,
+      KeyStructureInfo restructuringInfo) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = columnVectorInfo.size + offset;
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    for (int j = offset; j < len; j++) {
+      int dict = dataChunkStore.getSurrogate(j);
+      if (columnVectorInfo.directDictionaryGenerator == null) {
+        vector.putInt(vectorOffset++, dict);
+      } else {
+        Object valueFromSurrogate =
+            
columnVectorInfo.directDictionaryGenerator.getValueFromSurrogate(dict);
+        if (valueFromSurrogate == null) {
+          vector.putNull(vectorOffset++);
+        } else {
+          switch (columnVectorInfo.directDictionaryGenerator.getReturnType()) {
+            case INT:
+              vector.putInt(vectorOffset++, (int) valueFromSurrogate);
+              break;
+            case LONG:
+              vector.putLong(vectorOffset++, (long) valueFromSurrogate);
+              break;
+          }
+        }
+      }
+    }
+    return column + 1;
+  }
+
+  /**
+   * Fill the data to vector
+   *
+   * @param rowMapping
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(int[] rowMapping, 
ColumnVectorInfo[] vectorInfo,
+      int column, KeyStructureInfo restructuringInfo) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = columnVectorInfo.size + offset;
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    for (int j = offset; j < len; j++) {
+      int dict = dataChunkStore.getSurrogate(rowMapping[j]);
+      if (columnVectorInfo.directDictionaryGenerator == null) {
+        vector.putInt(vectorOffset++, dict);
+      } else {
+        Object valueFromSurrogate =
+            
columnVectorInfo.directDictionaryGenerator.getValueFromSurrogate(dict);
+        if (valueFromSurrogate == null) {
+          vector.putNull(vectorOffset++);
+        } else {
+          switch (columnVectorInfo.directDictionaryGenerator.getReturnType()) {
+            case INT:
+              vector.putInt(vectorOffset++, (int) valueFromSurrogate);
+              break;
+            case LONG:
+              vector.putLong(vectorOffset++, (long) valueFromSurrogate);
+              break;
+          }
+        }
+      }
+    }
+    return column + 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
new file mode 100644
index 0000000..342c392
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.impl;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
+/**
+ * This class is gives access to variable length dimension data chunk store
+ */
+public class VariableLengthDimensionDataChunk extends 
AbstractDimensionDataChunk {
+
+  /**
+   * Constructor for this class
+   *
+   * @param dataChunkStore  data chunk
+   * @param chunkAttributes chunk attributes
+   */
+  public VariableLengthDimensionDataChunk(byte[] dataChunks, int[] 
invertedIndex,
+      int[] invertedIndexReverse, int numberOfRows) {
+    long totalSize = null != invertedIndex ?
+        (dataChunks.length + (2 * numberOfRows * 
CarbonCommonConstants.INT_SIZE_IN_BYTE) + (
+            numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE)) :
+        (dataChunks.length + (numberOfRows * 
CarbonCommonConstants.INT_SIZE_IN_BYTE));
+    dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+        .getDimensionChunkStore(0, null != invertedIndex, numberOfRows, 
totalSize,
+            DimensionStoreType.VARIABLELENGTH);
+    dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+  }
+
+  /**
+   * Below method will be used to fill the data based on offset and row id
+   *
+   * @param data              data to filed
+   * @param offset            offset from which data need to be filed
+   * @param index             row id of the chunk
+   * @param restructuringInfo define the structure of the key
+   * @return how many bytes was copied
+   */
+  @Override public int fillChunkData(byte[] data, int offset, int index,
+      KeyStructureInfo restructuringInfo) {
+    // no required in this case because this column chunk is not the part if
+    // mdkey
+    return 0;
+  }
+
+  /**
+   * Converts to column dictionary integer value
+   *
+   * @param rowId
+   * @param columnIndex
+   * @param row
+   * @param restructuringInfo
+   * @return
+   */
+  @Override public int fillConvertedChunkData(int rowId, int columnIndex, 
int[] row,
+      KeyStructureInfo restructuringInfo) {
+    return columnIndex + 1;
+  }
+
+  /**
+   * @return whether column is dictionary column or not
+   */
+  @Override public boolean isNoDicitionaryColumn() {
+    return true;
+  }
+
+  /**
+   * @return length of each column
+   */
+  @Override public int getColumnValueSize() {
+    return -1;
+  }
+
+  /**
+   * Fill the data to vector
+   *
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, 
int column,
+      KeyStructureInfo restructuringInfo) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    for (int i = offset; i < len; i++) {
+      byte[] value = dataChunkStore.getRow(i);
+      // Considering only String case now as we support only
+      // string in no dictionary case at present.
+      if (value == null || 
Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
+        vector.putNull(vectorOffset++);
+      } else {
+        vector.putBytes(vectorOffset++, value);
+      }
+    }
+    return column + 1;
+  }
+
+  /**
+   * Fill the data to vector
+   *
+   * @param rowMapping
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(int[] rowMapping, 
ColumnVectorInfo[] vectorInfo,
+      int column, KeyStructureInfo restructuringInfo) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    for (int i = offset; i < len; i++) {
+      byte[] value = dataChunkStore.getRow(rowMapping[i]);
+      // Considering only String case now as we support only
+      // string in no dictionary case at present.
+      if (value == null || 
Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
+        vector.putNull(vectorOffset++);
+      } else {
+        vector.putBytes(vectorOffset++, value);
+      }
+    }
+    return column + 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
new file mode 100644
index 0000000..3d4e3bf
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader;
+
+import 
org.apache.carbondata.core.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
+import 
org.apache.carbondata.core.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
+import 
org.apache.carbondata.core.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
+import 
org.apache.carbondata.core.datastore.chunk.reader.measure.v2.CompressedMeasureChunkFileBasedReaderV2;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+
+/**
+ * Factory class to get the data reader instance based on version
+ */
+public class CarbonDataReaderFactory {
+
+  /**
+   * static instance
+   */
+  private static final CarbonDataReaderFactory CARBON_DATA_READER_FACTORY =
+      new CarbonDataReaderFactory();
+
+  /**
+   * private constructor
+   */
+  private CarbonDataReaderFactory() {
+
+  }
+
+  /**
+   * To get the instance of the reader factor
+   *
+   * @return reader factory
+   */
+  public static CarbonDataReaderFactory getInstance() {
+    return CARBON_DATA_READER_FACTORY;
+  }
+
+  /**
+   * Below method will be used to get the dimension column chunk reader based 
on version number
+   *
+   * @param version             reader version
+   * @param blockletInfo        blocklet info
+   * @param eachColumnValueSize size of each dimension column
+   * @param filePath            carbon data file path
+   * @return dimension column data reader based on version number
+   */
+  public DimensionColumnChunkReader 
getDimensionColumnChunkReader(ColumnarFormatVersion version,
+      BlockletInfo blockletInfo, int[] eachColumnValueSize, String filePath) {
+    switch (version) {
+      case V1:
+        return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, 
eachColumnValueSize,
+            filePath);
+      default:
+        return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, 
eachColumnValueSize,
+            filePath);
+    }
+  }
+
+  /**
+   * Below method will be used to get the measure column chunk reader based 
version number
+   *
+   * @param version      reader version
+   * @param blockletInfo blocklet info
+   * @param filePath     carbon data file path
+   * @return measure column data reader based on version number
+   */
+  public MeasureColumnChunkReader 
getMeasureColumnChunkReader(ColumnarFormatVersion version,
+      BlockletInfo blockletInfo, String filePath) {
+    switch (version) {
+      case V1:
+        return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, 
filePath);
+      default:
+        return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, 
filePath);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
new file mode 100644
index 0000000..4981241
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+
+/**
+ * Interface for reading the data chunk
+ * Its concrete implementation can be used to read the chunk.
+ * compressed or uncompressed chunk
+ */
+public interface DimensionColumnChunkReader {
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks to be read
+   * @return dimension column chunks
+   */
+  DimensionColumnDataChunk[] readDimensionChunks(FileHolder fileReader, 
int[][] blockIndexes)
+      throws IOException;
+
+  /**
+   * Below method will be used to read the chunk based on block index
+   *
+   * @param fileReader file reader to read the blocks from file
+   * @param blockIndex block to be read
+   * @return dimension column chunk
+   */
+  DimensionColumnDataChunk readDimensionChunk(FileHolder fileReader, int 
blockIndex)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
new file mode 100644
index 0000000..a706d71
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+
+/**
+ * Reader interface for reading the measure blocks from file
+ */
+public interface MeasureColumnChunkReader {
+
+  /**
+   * Method to read the blocks data based on block indexes
+   *
+   * @param fileReader   file reader to read the blocks
+   * @param blockIndexes blocks to be read
+   * @return measure data chunks
+   */
+  MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader, int[][] 
blockIndexes)
+      throws IOException;
+
+  /**
+   * Method to read the blocks data based on block index
+   *
+   * @param fileReader file reader to read the blocks
+   * @param blockIndex block to be read
+   * @return measure data chunk
+   */
+  MeasureColumnDataChunk readMeasureChunk(FileHolder fileReader, int 
blockIndex) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
new file mode 100644
index 0000000..2de673a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Class which will have all the common properties and behavior among all type
+ * of reader
+ */
+public abstract class AbstractChunkReader implements 
DimensionColumnChunkReader {
+
+  /**
+   * compressor will be used to uncompress the data
+   */
+  protected static final Compressor COMPRESSOR = 
CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * size of the each column value
+   * for no dictionary column it will be -1
+   */
+  protected int[] eachColumnValueSize;
+
+  /**
+   * full qualified path of the data file from
+   * which data will be read
+   */
+  protected String filePath;
+
+  /**
+   * this will be used to uncompress the
+   * row id and rle chunk
+   */
+  protected NumberCompressor numberComressor;
+
+  /**
+   * number of element in each chunk
+   */
+  protected int numberOfRows;
+
+  /**
+   * Constructor to get minimum parameter to create
+   * instance of this class
+   *
+   * @param eachColumnValueSize  size of the each column value
+   * @param filePath             file from which data will be read
+   */
+  public AbstractChunkReader(final int[] eachColumnValueSize, final String 
filePath,
+      int numberOfRows) {
+    this.eachColumnValueSize = eachColumnValueSize;
+    this.filePath = filePath;
+    int numberOfElement = 0;
+    try {
+      numberOfElement = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+              CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
+    } catch (NumberFormatException exception) {
+      numberOfElement = 
Integer.parseInt(CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
+    }
+    this.numberComressor = new NumberCompressor(numberOfElement);
+    this.numberOfRows = numberOfRows;
+  }
+
+  /**
+   * Below method will be used to create the inverted index reverse
+   * this will be used to point to actual data in the chunk
+   *
+   * @param invertedIndex inverted index
+   * @return reverse inverted index
+   */
+  protected int[] getInvertedReverseIndex(int[] invertedIndex) {
+    int[] columnIndexTemp = new int[invertedIndex.length];
+
+    for (int i = 0; i < invertedIndex.length; i++) {
+      columnIndexTemp[invertedIndex[i]] = i;
+    }
+    return columnIndexTemp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
new file mode 100644
index 0000000..4a0e0f7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension.v1;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Compressed dimension chunk reader class
+ */
+public class CompressedDimensionChunkFileBasedReaderV1 extends 
AbstractChunkReader {
+
+  /**
+   * data chunk list which holds the information
+   * about the data block metadata
+   */
+  private final List<DataChunk> dimensionColumnChunk;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param blockletInfo        blocklet info
+   * @param eachColumnValueSize size of the each column value
+   * @param filePath            file from which data will be read
+   */
+  public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo 
blockletInfo,
+      final int[] eachColumnValueSize, final String filePath) {
+    super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
+    this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk();
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks to be read
+   * @return dimension column chunks
+   */
+  @Override public DimensionColumnDataChunk[] readDimensionChunks(FileHolder 
fileReader,
+      int[][] blockIndexes) throws IOException {
+    // read the column chunk based on block index and add
+    DimensionColumnDataChunk[] dataChunks =
+        new DimensionColumnDataChunk[dimensionColumnChunk.size()];
+    for (int i = 0; i < blockIndexes.length; i++) {
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        dataChunks[j] = readDimensionChunk(fileReader, j);
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block index
+   *
+   * @param fileReader file reader to read the blocks from file
+   * @param blockIndex block to be read
+   * @return dimension column chunk
+   */
+  @Override public DimensionColumnDataChunk readDimensionChunk(FileHolder 
fileReader,
+      int blockIndex) throws IOException {
+    byte[] dataPage = null;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    int[] rlePage = null;
+
+    // first read the data and uncompressed it
+    dataPage = COMPRESSOR.unCompressByte(fileReader
+        .readByteArray(filePath, 
dimensionColumnChunk.get(blockIndex).getDataPageOffset(),
+            dimensionColumnChunk.get(blockIndex).getDataPageLength()));
+    // if row id block is present then read the row id chunk and uncompress it
+    if 
(CarbonUtil.hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(),
+        Encoding.INVERTED_INDEX)) {
+      invertedIndexes = CarbonUtil
+          
.getUnCompressColumnIndex(dimensionColumnChunk.get(blockIndex).getRowIdPageLength(),
+              fileReader.readByteArray(filePath,
+                  dimensionColumnChunk.get(blockIndex).getRowIdPageOffset(),
+                  dimensionColumnChunk.get(blockIndex).getRowIdPageLength()), 
numberComressor, 0);
+      // get the reverse index
+      invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+    }
+    // if rle is applied then read the rle block chunk and then uncompress
+    //then actual data based on rle block
+    if (CarbonUtil
+        .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), 
Encoding.RLE)) {
+      // read and uncompress the rle block
+      rlePage = numberComressor.unCompress(fileReader
+              .readByteArray(filePath, 
dimensionColumnChunk.get(blockIndex).getRlePageOffset(),
+                  dimensionColumnChunk.get(blockIndex).getRlePageLength()), 0,
+          dimensionColumnChunk.get(blockIndex).getRlePageLength());
+      // uncompress the data with rle indexes
+      dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, 
eachColumnValueSize[blockIndex]);
+      rlePage = null;
+    }
+    // fill chunk attributes
+    DimensionColumnDataChunk columnDataChunk = null;
+    if (dimensionColumnChunk.get(blockIndex).isRowMajor()) {
+      // to store fixed length column chunk values
+      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, 
eachColumnValueSize[blockIndex],
+          numberOfRows);
+    }
+    // if no dictionary column then first create a no dictionary column chunk
+    // and set to data chunk instance
+    else if (!CarbonUtil
+        .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), 
Encoding.DICTIONARY)) {
+      columnDataChunk =
+          new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, 
invertedIndexesReverse,
+              numberOfRows);
+    } else {
+      // to store fixed length column chunk values
+      columnDataChunk =
+          new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, 
invertedIndexesReverse,
+              numberOfRows, eachColumnValueSize[blockIndex]);
+    }
+    return columnDataChunk;
+  }
+
+}

Reply via email to