http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
new file mode 100644
index 0000000..7875f92
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.iterator;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+
+/**
+ * In case of detail query we cannot keep all the records in memory so for
+ * executing that query are returning a iterator over block and every time next
+ * call will come it will execute the block and return the result
+ */
+public class DetailQueryResultIterator extends 
AbstractDetailQueryResultIterator<BatchResult> {
+
+  private final Object lock = new Object();
+  private Future<BatchResult> future;
+
+  public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel 
queryModel,
+      ExecutorService execService) {
+    super(infos, queryModel, execService);
+  }
+
+  @Override public BatchResult next() {
+    BatchResult result;
+    long startTime = System.currentTimeMillis();
+    try {
+      if (future == null) {
+        future = execute();
+      }
+      result = future.get();
+      nextBatch = false;
+      if (hasNext()) {
+        nextBatch = true;
+        future = execute();
+      } else {
+        fileReader.finish();
+      }
+      totalScanTime += System.currentTimeMillis() - startTime;
+    } catch (Exception ex) {
+      try {
+        fileReader.finish();
+      } finally {
+        throw new RuntimeException(ex);
+      }
+    }
+    return result;
+  }
+
+  private Future<BatchResult> execute() {
+    return execService.submit(new Callable<BatchResult>() {
+      @Override public BatchResult call() {
+        BatchResult batchResult = new BatchResult();
+        synchronized (lock) {
+          updateDataBlockIterator();
+          if (dataBlockIterator != null) {
+            batchResult.setRows(dataBlockIterator.next());
+          }
+        }
+        return batchResult;
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
new file mode 100644
index 0000000..2f6a8b7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.iterator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * This is a wrapper iterator over the detail raw query iterator.
+ * This iterator will handle the processing of the raw rows.
+ * This will handle the batch results and will iterate on the batches and give 
single row.
+ */
+public class RawResultIterator extends CarbonIterator<Object[]> {
+
+  private final SegmentProperties sourceSegProperties;
+
+  private final SegmentProperties destinationSegProperties;
+  /**
+   * Iterator of the Batch raw result.
+   */
+  private CarbonIterator<BatchResult> detailRawQueryResultIterator;
+
+  /**
+   * Counter to maintain the row counter.
+   */
+  private int counter = 0;
+
+  private Object[] currentConveretedRawRow = null;
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RawResultIterator.class.getName());
+
+  /**
+   * batch of the result.
+   */
+  private BatchResult batch;
+
+  public RawResultIterator(CarbonIterator<BatchResult> 
detailRawQueryResultIterator,
+      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
+    this.detailRawQueryResultIterator = detailRawQueryResultIterator;
+    this.sourceSegProperties = sourceSegProperties;
+    this.destinationSegProperties = destinationSegProperties;
+  }
+
+  @Override public boolean hasNext() {
+
+    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
+      if (detailRawQueryResultIterator.hasNext()) {
+        batch = null;
+        batch = detailRawQueryResultIterator.next();
+        counter = 0; // batch changed so reset the counter.
+      } else {
+        return false;
+      }
+    }
+
+    if (!checkIfBatchIsProcessedCompletely(batch)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override public Object[] next() {
+    if (null == batch) { // for 1st time
+      batch = detailRawQueryResultIterator.next();
+    }
+    if (!checkIfBatchIsProcessedCompletely(batch)) {
+      try {
+        if(null != currentConveretedRawRow){
+          counter++;
+          Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
+          currentConveretedRawRow = null;
+          return currentConveretedRawRowTemp;
+        }
+        return convertRow(batch.getRawRow(counter++));
+      } catch (KeyGenException e) {
+        LOGGER.error(e.getMessage());
+        return null;
+      }
+    } else { // completed one batch.
+      batch = null;
+      batch = detailRawQueryResultIterator.next();
+      counter = 0;
+    }
+    try {
+      if(null != currentConveretedRawRow){
+        counter++;
+        Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
+        currentConveretedRawRow = null;
+        return currentConveretedRawRowTemp;
+      }
+
+      return convertRow(batch.getRawRow(counter++));
+    } catch (KeyGenException e) {
+      LOGGER.error(e.getMessage());
+      return null;
+    }
+
+  }
+
+  /**
+   * for fetching the row with out incrementing counter.
+   * @return
+   */
+  public Object[] fetchConverted() throws KeyGenException {
+    if(null != currentConveretedRawRow){
+      return currentConveretedRawRow;
+    }
+    if(hasNext())
+    {
+      Object[] rawRow = batch.getRawRow(counter);
+      currentConveretedRawRow = convertRow(rawRow);
+      return currentConveretedRawRow;
+    }
+    else
+    {
+      return null;
+    }
+  }
+
+  private Object[] convertRow(Object[] rawRow) throws KeyGenException {
+    byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey();
+    long[] keyArray = 
sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims);
+    byte[] covertedBytes =
+        
destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray);
+    ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes);
+    return rawRow;
+  }
+
+  /**
+   * To check if the batch is processed completely
+   * @param batch
+   * @return
+   */
+  private boolean checkIfBatchIsProcessedCompletely(BatchResult batch){
+    if(counter < batch.getSize())
+    {
+      return false;
+    }
+    else{
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorChunkRowIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorChunkRowIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorChunkRowIterator.java
new file mode 100644
index 0000000..b412655
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorChunkRowIterator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.iterator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+
+/**
+ * Iterator over row result
+ */
+public class VectorChunkRowIterator extends CarbonIterator<Object[]> {
+
+  /**
+   * iterator over chunk result
+   */
+  private AbstractDetailQueryResultIterator iterator;
+
+  /**
+   * currect chunk
+   */
+  private CarbonColumnarBatch columnarBatch;
+
+  private int batchSize;
+
+  private int currentIndex;
+
+  public VectorChunkRowIterator(AbstractDetailQueryResultIterator iterator,
+      CarbonColumnarBatch columnarBatch) {
+    this.iterator = iterator;
+    this.columnarBatch = columnarBatch;
+    if (iterator.hasNext()) {
+      iterator.processNextBatch(columnarBatch);
+      batchSize = columnarBatch.getActualSize();
+      currentIndex = 0;
+    }
+  }
+
+  /**
+   * Returns {@code true} if the iteration has more elements. (In other words,
+   * returns {@code true} if {@link #next} would return an element rather than
+   * throwing an exception.)
+   *
+   * @return {@code true} if the iteration has more elements
+   */
+  @Override public boolean hasNext() {
+    if (currentIndex < batchSize) {
+      return true;
+    } else {
+      while (iterator.hasNext()) {
+        columnarBatch.reset();
+        iterator.processNextBatch(columnarBatch);
+        batchSize = columnarBatch.getActualSize();
+        currentIndex = 0;
+        if (currentIndex < batchSize) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Returns the next element in the iteration.
+   *
+   * @return the next element in the iteration
+   */
+  @Override public Object[] next() {
+    Object[] row = new Object[columnarBatch.columnVectors.length];
+    for (int i = 0; i < row.length; i++) {
+      row[i] = columnarBatch.columnVectors[i].getData(currentIndex);
+    }
+    currentIndex++;
+    return row;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
new file mode 100644
index 0000000..00116c5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.iterator;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+
+/**
+ * It reads the data vector batch format
+ */
+public class VectorDetailQueryResultIterator extends 
AbstractDetailQueryResultIterator<Object> {
+
+  private final Object lock = new Object();
+
+  public VectorDetailQueryResultIterator(List<BlockExecutionInfo> infos, 
QueryModel queryModel,
+      ExecutorService execService) {
+    super(infos, queryModel, execService);
+  }
+
+  @Override public Object next() {
+    throw new UnsupportedOperationException("call processNextBatch instaed");
+  }
+
+  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    synchronized (lock) {
+      updateDataBlockIterator();
+      if (dataBlockIterator != null) {
+        dataBlockIterator.processNextBatch(columnarBatch);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
new file mode 100644
index 0000000..7d29b0f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector;
+
+import org.apache.spark.sql.types.Decimal;
+
+public interface CarbonColumnVector {
+
+  void putShort(int rowId, short value);
+
+  void putInt(int rowId, int value);
+
+  void putLong(int rowId, long value);
+
+  void putDecimal(int rowId, Decimal value, int precision);
+
+  void putDouble(int rowId, double value);
+
+  void putBytes(int rowId, byte[] value);
+
+  void putBytes(int rowId, int offset, int length, byte[] value);
+
+  void putNull(int rowId);
+
+  boolean isNull(int rowId);
+
+  void putObject(int rowId, Object obj);
+
+  Object getData(int rowId);
+
+  void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
new file mode 100644
index 0000000..faeffde
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.result.vector;
+
+public class CarbonColumnarBatch {
+
+  public CarbonColumnVector[] columnVectors;
+
+  private int batchSize;
+
+  private int actualSize;
+
+  private int rowCounter;
+
+  public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int 
batchSize) {
+    this.columnVectors = columnVectors;
+    this.batchSize = batchSize;
+  }
+
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  public int getActualSize() {
+    return actualSize;
+  }
+
+  public void setActualSize(int actualSize) {
+    this.actualSize = actualSize;
+  }
+
+  public void reset() {
+    actualSize = 0;
+    rowCounter = 0;
+    for (int i = 0; i < columnVectors.length; i++) {
+      columnVectors[i].reset();
+    }
+  }
+
+  public int getRowCounter() {
+    return rowCounter;
+  }
+
+  public void setRowCounter(int rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
new file mode 100644
index 0000000..852abe9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector;
+
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+
+public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
+  public int offset;
+  public int size;
+  public CarbonColumnVector vector;
+  public int vectorOffset;
+  public QueryDimension dimension;
+  public QueryMeasure measure;
+  public int ordinal;
+  public DirectDictionaryGenerator directDictionaryGenerator;
+  public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;
+  public GenericQueryType genericQueryType;
+
+  @Override public int compareTo(ColumnVectorInfo o) {
+    return ordinal - o.ordinal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
new file mode 100644
index 0000000..3c25efc
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector;
+
+import java.math.BigDecimal;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.metadata.DataType;
+
+import org.apache.spark.sql.types.Decimal;
+
+public class MeasureDataVectorProcessor {
+
+  public interface MeasureVectorFiller {
+
+    void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo 
info);
+
+    void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk 
dataChunk,
+        ColumnVectorInfo info);
+  }
+
+  public static class IntegralMeasureVectorFiller implements 
MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putInt(vectorOffset,
+              
(int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putInt(vectorOffset,
+              
(int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class ShortMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putShort(vectorOffset,
+              (short) 
dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putShort(vectorOffset,
+              (short) 
dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class LongMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putLong(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putLong(vectorOffset,
+              
dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class DecimalMeasureVectorFiller implements 
MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          BigDecimal decimal =
+              
dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(i);
+          Decimal toDecimal = 
org.apache.spark.sql.types.Decimal.apply(decimal);
+          vector.putDecimal(vectorOffset, toDecimal, precision);
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          BigDecimal decimal =
+              
dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(currentRow);
+          Decimal toDecimal = 
org.apache.spark.sql.types.Decimal.apply(decimal);
+          vector.putDecimal(vectorOffset, toDecimal, precision);
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class DefaultMeasureVectorFiller implements 
MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, 
ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putDouble(vectorOffset,
+              
dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, 
MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putDouble(vectorOffset,
+              
dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class MeasureVectorFillerFactory {
+
+    public static MeasureVectorFiller getMeasureVectorFiller(DataType 
dataType) {
+      switch (dataType) {
+        case SHORT:
+          return new ShortMeasureVectorFiller();
+        case INT:
+          return new IntegralMeasureVectorFiller();
+        case LONG:
+          return new LongMeasureVectorFiller();
+        case DECIMAL:
+          return new DecimalMeasureVectorFiller();
+        default:
+          return new DefaultMeasureVectorFiller();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
new file mode 100644
index 0000000..7ae1a41
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.result.vector.impl;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class CarbonColumnVectorImpl implements CarbonColumnVector {
+
+  private Object[] data;
+
+  private int[] ints;
+
+  private long[] longs;
+
+  private Decimal[] decimals;
+
+  private byte[][] bytes;
+
+  private double[] doubles;
+
+  private BitSet nullBytes;
+
+  private DataType dataType;
+
+  public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
+    nullBytes = new BitSet(batchSize);
+    this.dataType = dataType;
+    switch (dataType) {
+      case INT:
+        ints = new int[batchSize];
+        break;
+      case LONG:
+        longs = new long[batchSize];
+        break;
+      case DOUBLE:
+        doubles = new double[batchSize];
+        break;
+      case STRING:
+        bytes = new byte[batchSize][];
+        break;
+      case DECIMAL:
+        decimals = new Decimal[batchSize];
+        break;
+      default:
+        data = new Object[batchSize];
+    }
+  }
+
+  @Override public void putShort(int rowId, short value) {
+
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    ints[rowId] = value;
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    longs[rowId] = value;
+  }
+
+  @Override public void putDecimal(int rowId, Decimal value, int precision) {
+    decimals[rowId] = value;
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    doubles[rowId] = value;
+  }
+
+  @Override public void putBytes(int rowId, byte[] value) {
+    bytes[rowId] = value;
+  }
+
+  @Override public void putBytes(int rowId, int offset, int length, byte[] 
value) {
+
+  }
+
+  @Override public void putNull(int rowId) {
+    nullBytes.set(rowId);
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return nullBytes.get(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    data[rowId] = obj;
+  }
+
+  @Override public Object getData(int rowId) {
+    if (nullBytes.get(rowId)) {
+      return null;
+    }
+    switch (dataType) {
+      case INT:
+        return ints[rowId];
+      case LONG:
+        return longs[rowId];
+      case DOUBLE:
+        return doubles[rowId];
+      case STRING:
+        return UTF8String.fromBytes(bytes[rowId]);
+      case DECIMAL:
+        return decimals[rowId];
+      default:
+        return data[rowId];
+    }
+  }
+
+  @Override public void reset() {
+    nullBytes.clear();
+    switch (dataType) {
+      case INT:
+        Arrays.fill(ints, 0);
+        break;
+      case LONG:
+        Arrays.fill(longs, 0);
+        break;
+      case DOUBLE:
+        Arrays.fill(doubles, 0);
+        break;
+      case STRING:
+        Arrays.fill(bytes, null);
+        break;
+      case DECIMAL:
+        Arrays.fill(decimals, null);
+        break;
+      default:
+        Arrays.fill(data, null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
new file mode 100644
index 0000000..32b3ac2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.scanner;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.update.data.BlockletDeleteDeltaCacheLoader;
+import org.apache.carbondata.core.update.data.DeleteDeltaCacheLoaderIntf;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+
+/**
+ * Blocklet scanner class to process the block
+ */
+public abstract class AbstractBlockletScanner implements BlockletScanner {
+
+  /**
+   * scanner result
+   */
+  protected AbstractScannedResult scannedResult;
+
+  /**
+   * block execution info
+   */
+  protected BlockExecutionInfo blockExecutionInfo;
+
+  public QueryStatisticsModel queryStatisticsModel;
+
+  public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
+    this.blockExecutionInfo = tableBlockExecutionInfos;
+  }
+
+  @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder 
blocksChunkHolder)
+      throws IOException, FilterUnsupportedException {
+    fillKeyValue(blocksChunkHolder);
+    return scannedResult;
+  }
+
+  protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) throws 
IOException {
+
+    QueryStatistic totalBlockletStatistic = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+            .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
+    
totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
+            totalBlockletStatistic.getCount() + 1);
+    
queryStatisticsModel.getRecorder().recordStatistics(totalBlockletStatistic);
+    QueryStatistic validScannedBlockletStatistic = queryStatisticsModel
+            
.getStatisticsTypeAndObjMap().get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+    validScannedBlockletStatistic
+            
.addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+                    validScannedBlockletStatistic.getCount() + 1);
+    
queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
+    scannedResult.reset();
+    scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize());
+    scannedResult.setBlockletId(
+              blockExecutionInfo.getBlockId() + 
CarbonCommonConstants.FILE_SEPARATOR
+                      + blocksChunkHolder.getDataBlock().nodeNumber());
+    scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock()
+        .getDimensionChunks(blocksChunkHolder.getFileReader(),
+            blockExecutionInfo.getAllSelectedDimensionBlocksIndexes()));
+    scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
+            .getMeasureChunks(blocksChunkHolder.getFileReader(),
+                blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
+    // loading delete data cache in blockexecutioninfo instance
+    DeleteDeltaCacheLoaderIntf deleteCacheLoader =
+        new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
+            blocksChunkHolder.getDataBlock(), 
blockExecutionInfo.getAbsoluteTableIdentifier());
+    deleteCacheLoader.loadDeleteDeltaFileDataToCache();
+    scannedResult
+        
.setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
new file mode 100644
index 0000000..9484318
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.scanner;
+
+import java.io.IOException;
+
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+
+/**
+ * Interface for processing the block
+ * Processing can be filter based processing or non filter based processing
+ */
+public interface BlockletScanner {
+
+  /**
+   * Below method will used to process the block data and get the scanned 
result
+   *
+   * @param blocksChunkHolder block chunk which holds the block data
+   * @return scannerResult
+   * result after processing
+   */
+  AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+      throws IOException, FilterUnsupportedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
new file mode 100644
index 0000000..48c9af2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.scanner.impl;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.update.data.BlockletDeleteDeltaCacheLoader;
+import org.apache.carbondata.core.update.data.DeleteDeltaCacheLoaderIntf;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.FileHolder;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.result.AbstractScannedResult;
+import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult;
+import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
+
+/**
+ * Below class will be used for filter query processing
+ * this class will be first apply the filter then it will read the block if
+ * required and return the scanned result
+ */
+public class FilterScanner extends AbstractBlockletScanner {
+
+  /**
+   * filter tree
+   */
+  private FilterExecuter filterExecuter;
+  /**
+   * this will be used to apply min max
+   * this will be useful for dimension column which is on the right side
+   * as node finder will always give tentative blocks, if column data stored 
individually
+   * and data is in sorted order then we can check whether filter is in the 
range of min max or not
+   * if it present then only we can apply filter on complete data.
+   * this will be very useful in case of sparse data when rows are
+   * repeating.
+   */
+  private boolean isMinMaxEnabled;
+
+  private QueryStatisticsModel queryStatisticsModel;
+
+  public FilterScanner(BlockExecutionInfo blockExecutionInfo,
+      QueryStatisticsModel queryStatisticsModel) {
+    super(blockExecutionInfo);
+    scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+    // to check whether min max is enabled or not
+    String minMaxEnableValue = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
+            CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE);
+    if (null != minMaxEnableValue) {
+      isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue);
+    }
+    // get the filter tree
+    this.filterExecuter = blockExecutionInfo.getFilterExecuterTree();
+    this.queryStatisticsModel = queryStatisticsModel;
+  }
+
+  /**
+   * Below method will be used to process the block
+   *
+   * @param blocksChunkHolder block chunk holder which holds the data
+   * @throws FilterUnsupportedException
+   */
+  @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder 
blocksChunkHolder)
+      throws IOException, FilterUnsupportedException {
+    fillScannedResult(blocksChunkHolder);
+    return scannedResult;
+  }
+
+  /**
+   * This method will process the data in below order
+   * 1. first apply min max on the filter tree and check whether any of the 
filter
+   * is fall on the range of min max, if not then return empty result
+   * 2. If filter falls on min max range then apply filter on actual
+   * data and get the filtered row index
+   * 3. if row index is empty then return the empty result
+   * 4. if row indexes is not empty then read only those blocks(measure or 
dimension)
+   * which was present in the query but not present in the filter, as while 
applying filter
+   * some of the blocks where already read and present in chunk holder so not 
need to
+   * read those blocks again, this is to avoid reading of same blocks which 
was already read
+   * 5. Set the blocks and filter indexes to result
+   *
+   * @param blocksChunkHolder
+   * @throws FilterUnsupportedException
+   */
+  private void fillScannedResult(BlocksChunkHolder blocksChunkHolder)
+      throws FilterUnsupportedException, IOException {
+    scannedResult.reset();
+    scannedResult.setBlockletId(
+        blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR 
+ blocksChunkHolder
+            .getDataBlock().nodeNumber());
+    // apply min max
+    if (isMinMaxEnabled) {
+      BitSet bitSet = this.filterExecuter
+          
.isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
+              blocksChunkHolder.getDataBlock().getColumnsMinValue());
+      if (bitSet.isEmpty()) {
+        scannedResult.setNumberOfRows(0);
+        scannedResult.setIndexes(new int[0]);
+        CarbonUtil.freeMemory(blocksChunkHolder.getDimensionDataChunk(),
+            blocksChunkHolder.getMeasureDataChunk());
+        return;
+      }
+    }
+    // apply filter on actual data
+    BitSet bitSet = this.filterExecuter.applyFilter(blocksChunkHolder);
+    // if indexes is empty then return with empty result
+    if (bitSet.isEmpty()) {
+      scannedResult.setNumberOfRows(0);
+      scannedResult.setIndexes(new int[0]);
+      return;
+    }
+    // valid scanned blocklet
+    QueryStatistic validScannedBlockletStatistic = 
queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM);
+    validScannedBlockletStatistic
+        .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM,
+            validScannedBlockletStatistic.getCount() + 1);
+    
queryStatisticsModel.getRecorder().recordStatistics(validScannedBlockletStatistic);
+    // get the row indexes from bot set
+    int[] indexes = new int[bitSet.cardinality()];
+    int index = 0;
+    for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+      indexes[index++] = i;
+    }
+    // loading delete data cache in blockexecutioninfo instance
+    DeleteDeltaCacheLoaderIntf deleteCacheLoader =
+        new BlockletDeleteDeltaCacheLoader(scannedResult.getBlockletId(),
+            blocksChunkHolder.getDataBlock(), 
blockExecutionInfo.getAbsoluteTableIdentifier());
+    deleteCacheLoader.loadDeleteDeltaFileDataToCache();
+    scannedResult
+        
.setBlockletDeleteDeltaCache(blocksChunkHolder.getDataBlock().getDeleteDeltaDataCache());
+    FileHolder fileReader = blocksChunkHolder.getFileReader();
+    int[][] allSelectedDimensionBlocksIndexes =
+        blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
+    DimensionColumnDataChunk[] projectionListDimensionChunk = 
blocksChunkHolder.getDataBlock()
+        .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
+
+    DimensionColumnDataChunk[] dimensionColumnDataChunk =
+        new 
DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
+    // read dimension chunk blocks from file which is not present
+    for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
+      if (null != blocksChunkHolder.getDimensionDataChunk()[i]) {
+        dimensionColumnDataChunk[i] = 
blocksChunkHolder.getDimensionDataChunk()[i];
+      }
+    }
+    for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
+      System.arraycopy(projectionListDimensionChunk, 
allSelectedDimensionBlocksIndexes[i][0],
+          dimensionColumnDataChunk, allSelectedDimensionBlocksIndexes[i][0],
+          allSelectedDimensionBlocksIndexes[i][1] + 1 - 
allSelectedDimensionBlocksIndexes[i][0]);
+    }
+    MeasureColumnDataChunk[] measureColumnDataChunk =
+        new 
MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
+    int[][] allSelectedMeasureBlocksIndexes =
+        blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
+    MeasureColumnDataChunk[] projectionListMeasureChunk = 
blocksChunkHolder.getDataBlock()
+        .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes);
+    // read the measure chunk blocks which is not present
+    for (int i = 0; i < measureColumnDataChunk.length; i++) {
+      if (null != blocksChunkHolder.getMeasureDataChunk()[i]) {
+        measureColumnDataChunk[i] = blocksChunkHolder.getMeasureDataChunk()[i];
+      }
+    }
+    for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
+      System.arraycopy(projectionListMeasureChunk, 
allSelectedMeasureBlocksIndexes[i][0],
+          measureColumnDataChunk, allSelectedMeasureBlocksIndexes[i][0],
+          allSelectedMeasureBlocksIndexes[i][1] + 1 - 
allSelectedMeasureBlocksIndexes[i][0]);
+    }
+    scannedResult.setDimensionChunks(dimensionColumnDataChunk);
+    scannedResult.setIndexes(indexes);
+    scannedResult.setMeasureChunks(measureColumnDataChunk);
+    scannedResult.setNumberOfRows(indexes.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
new file mode 100644
index 0000000..f07c2be
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.scanner.impl;
+
+import org.apache.carbondata.core.stats.QueryStatisticsModel;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult;
+import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner;
+
+/**
+ * Non filter processor which will be used for non filter query
+ * In case of non filter query we just need to read all the blocks requested 
in the
+ * query and pass it to scanned result
+ */
+public class NonFilterScanner extends AbstractBlockletScanner {
+
+  public NonFilterScanner(BlockExecutionInfo blockExecutionInfo,
+                          QueryStatisticsModel queryStatisticsModel) {
+    super(blockExecutionInfo);
+    // as its a non filter query creating a non filter query scanned result 
object
+    scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+    super.queryStatisticsModel = queryStatisticsModel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
new file mode 100644
index 0000000..68eb946
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.wrappers;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+/**
+ * This class will store the dimension column data when query is executed
+ * This can be used as a key for aggregation
+ */
+public class ByteArrayWrapper implements Comparable<ByteArrayWrapper> {
+
+  /**
+   * to store key which is generated using
+   * key generator
+   */
+  protected byte[] dictionaryKey;
+
+  /**
+   * to store no dictionary column data
+   */
+  protected byte[][] complexTypesKeys;
+
+  /**
+   * to store no dictionary column data
+   */
+  protected byte[][] noDictionaryKeys;
+
+  /**
+   * contains value of implicit columns in byte array format
+   */
+  protected byte[] implicitColumnByteArray;
+
+  public ByteArrayWrapper() {
+  }
+
+  /**
+   * @return the dictionaryKey
+   */
+  public byte[] getDictionaryKey() {
+    return dictionaryKey;
+  }
+
+  /**
+   * @param dictionaryKey the dictionaryKey to set
+   */
+  public void setDictionaryKey(byte[] dictionaryKey) {
+    this.dictionaryKey = dictionaryKey;
+  }
+
+  /**
+   * @param noDictionaryKeys the noDictionaryKeys to set
+   */
+  public void setNoDictionaryKeys(byte[][] noDictionaryKeys) {
+    this.noDictionaryKeys = noDictionaryKeys;
+  }
+
+  /**
+   * to get the no dictionary column data
+   *
+   * @param index of the no dictionary key
+   * @return no dictionary key for the index
+   */
+  public byte[] getNoDictionaryKeyByIndex(int index) {
+    return this.noDictionaryKeys[index];
+  }
+
+  /**
+   * to get the no dictionary column data
+   *
+   * @param index of the no dictionary key
+   * @return no dictionary key for the index
+   */
+  public byte[] getComplexTypeByIndex(int index) {
+    return this.complexTypesKeys[index];
+  }
+
+  /**
+   * to generate the hash code
+   */
+  @Override public int hashCode() {
+    // first generate the has code of the dictionary column
+    int len = dictionaryKey.length;
+    int result = 1;
+    for (int j = 0; j < len; j++) {
+      result = 31 * result + dictionaryKey[j];
+    }
+    // then no dictionary column
+    for (byte[] directSurrogateValue : noDictionaryKeys) {
+      for (int i = 0; i < directSurrogateValue.length; i++) {
+        result = 31 * result + directSurrogateValue[i];
+      }
+    }
+    // then for complex type
+    for (byte[] complexTypeKey : complexTypesKeys) {
+      for (int i = 0; i < complexTypeKey.length; i++) {
+        result = 31 * result + complexTypeKey[i];
+      }
+    }
+    return result;
+  }
+
+  /**
+   * to validate the two
+   *
+   * @param other object
+   */
+  @Override public boolean equals(Object other) {
+    if (null == other || !(other instanceof ByteArrayWrapper)) {
+      return false;
+    }
+    boolean result = false;
+    // Comparison will be as follows
+    // first compare the no dictionary column
+    // if it is not equal then return false
+    // if it is equal then compare the complex column
+    // if it is also equal then compare dictionary column
+    byte[][] noDictionaryKeysOther = ((ByteArrayWrapper) 
other).noDictionaryKeys;
+    if (noDictionaryKeysOther.length != noDictionaryKeys.length) {
+      return false;
+    } else {
+      for (int i = 0; i < noDictionaryKeys.length; i++) {
+        result = UnsafeComparer.INSTANCE.equals(noDictionaryKeys[i], 
noDictionaryKeysOther[i]);
+        if (!result) {
+          return false;
+        }
+      }
+    }
+
+    byte[][] complexTypesKeysOther = ((ByteArrayWrapper) 
other).complexTypesKeys;
+    if (complexTypesKeysOther.length != complexTypesKeys.length) {
+      return false;
+    } else {
+      for (int i = 0; i < complexTypesKeys.length; i++) {
+        result = UnsafeComparer.INSTANCE.equals(complexTypesKeys[i], 
complexTypesKeysOther[i]);
+        if (!result) {
+          return false;
+        }
+      }
+    }
+
+    return UnsafeComparer.INSTANCE.equals(dictionaryKey, ((ByteArrayWrapper) 
other).dictionaryKey);
+  }
+
+  /**
+   * Compare method for ByteArrayWrapper class this will used to compare Two
+   * ByteArrayWrapper data object, basically it will compare two byte array
+   *
+   * @param other ArrayWrapper Object
+   */
+  @Override public int compareTo(ByteArrayWrapper other) {
+    // compare will be as follows
+    //compare dictionary column
+    // then no dictionary column
+    // then complex type column data
+    int compareTo = UnsafeComparer.INSTANCE.compareTo(dictionaryKey, 
other.dictionaryKey);
+    if (compareTo == 0) {
+      for (int i = 0; i < noDictionaryKeys.length; i++) {
+        compareTo =
+            UnsafeComparer.INSTANCE.compareTo(noDictionaryKeys[i], 
other.noDictionaryKeys[i]);
+        if (compareTo != 0) {
+          return compareTo;
+        }
+      }
+    }
+    if (compareTo == 0) {
+      for (int i = 0; i < complexTypesKeys.length; i++) {
+        compareTo =
+            UnsafeComparer.INSTANCE.compareTo(complexTypesKeys[i], 
other.complexTypesKeys[i]);
+        if (compareTo != 0) {
+          return compareTo;
+        }
+      }
+    }
+    return compareTo;
+  }
+
+  /**
+   * @return the complexTypesKeys
+   */
+  public byte[][] getComplexTypesKeys() {
+    return complexTypesKeys;
+  }
+
+  /**
+   * @param complexTypesKeys the complexTypesKeys to set
+   */
+  public void setComplexTypesKeys(byte[][] complexTypesKeys) {
+    this.complexTypesKeys = complexTypesKeys;
+  }
+
+  /**
+   * @return
+   */
+  public byte[] getImplicitColumnByteArray() {
+    return implicitColumnByteArray;
+  }
+
+  /**
+   * @param implicitColumnByteArray
+   */
+  public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) {
+    this.implicitColumnByteArray = implicitColumnByteArray;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java
new file mode 100644
index 0000000..3c33867
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/service/CarbonCommonFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service;
+
+import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator;
+import org.apache.carbondata.core.service.impl.DictionaryFactory;
+import org.apache.carbondata.core.service.impl.PathFactory;
+import org.apache.carbondata.core.service.ColumnUniqueIdService;
+import org.apache.carbondata.core.service.DictionaryService;
+import org.apache.carbondata.core.service.PathService;
+
+/**
+ * Interface to get services
+ */
+public class CarbonCommonFactory {
+
+  /**
+   * @return dictionary service
+   */
+  public static DictionaryService getDictionaryService() {
+    return DictionaryFactory.getInstance();
+  }
+
+  /**
+   * @return path service
+   */
+  public static PathService getPathService() {
+    return PathFactory.getInstance();
+  }
+
+  /**
+   * @return unique id generator
+   */
+  public static ColumnUniqueIdService getColumnUniqueIdGenerator() {
+    return ColumnUniqueIdGenerator.getInstance();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
 
b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
new file mode 100644
index 0000000..b79f9db
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service.impl;
+
+import java.util.UUID;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.service.ColumnUniqueIdService;
+
+/**
+ * It returns unique id given column
+ */
+public class ColumnUniqueIdGenerator implements ColumnUniqueIdService {
+
+  private static ColumnUniqueIdService columnUniqueIdService = new 
ColumnUniqueIdGenerator();
+
+  @Override public String generateUniqueId(String databaseName, ColumnSchema 
columnSchema) {
+    return UUID.randomUUID().toString();
+  }
+
+  public static ColumnUniqueIdService getInstance() {
+    return columnUniqueIdService;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java
new file mode 100644
index 0000000..d9a6d19
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/service/impl/DictionaryFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service.impl;
+
+import org.apache.carbondata.core.CarbonTableIdentifier;
+import org.apache.carbondata.core.ColumnIdentifier;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
+import org.apache.carbondata.core.reader.CarbonDictionaryReader;
+import org.apache.carbondata.core.reader.CarbonDictionaryReaderImpl;
+import 
org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader;
+import 
org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReaderImpl;
+import org.apache.carbondata.core.service.DictionaryService;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
+import 
org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
+import 
org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
+
+/**
+ * service to get dictionary reader and writer
+ */
+public class DictionaryFactory implements DictionaryService {
+
+  private static DictionaryService dictService = new DictionaryFactory();
+
+  /**
+   * get dictionary writer
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionaryWriter getDictionaryWriter(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier 
columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionaryWriterImpl(carbonStorePath, 
carbonTableIdentifier, columnIdentifier);
+  }
+
+  /**
+   * get dictionary sort index writer
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionarySortIndexWriter 
getDictionarySortIndexWriter(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier 
columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, 
columnIdentifier,
+        carbonStorePath);
+  }
+
+  /**
+   * get dictionary metadata reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionaryMetadataReader getDictionaryMetadataReader(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier 
columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionaryMetadataReaderImpl(carbonStorePath, 
carbonTableIdentifier,
+        columnIdentifier);
+  }
+
+  /**
+   * get dictionary reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionaryReader getDictionaryReader(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier 
columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionaryReaderImpl(carbonStorePath, 
carbonTableIdentifier, columnIdentifier);
+  }
+
+  /**
+   * get dictionary sort index reader
+   *
+   * @param carbonTableIdentifier
+   * @param columnIdentifier
+   * @param carbonStorePath
+   * @return
+   */
+  @Override public CarbonDictionarySortIndexReader 
getDictionarySortIndexReader(
+      CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier 
columnIdentifier,
+      String carbonStorePath) {
+    return new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, 
columnIdentifier,
+        carbonStorePath);
+  }
+
+  public static DictionaryService getInstance() {
+    return dictService;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java 
b/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java
new file mode 100644
index 0000000..9cd4b41
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/service/impl/PathFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.service.impl;
+
+import org.apache.carbondata.core.CarbonTableIdentifier;
+import org.apache.carbondata.core.path.CarbonStorePath;
+import org.apache.carbondata.core.path.CarbonTablePath;
+import org.apache.carbondata.core.service.PathService;
+
+/**
+ * Create helper to get path details
+ */
+public class PathFactory implements PathService {
+
+  private static PathService pathService = new PathFactory();
+
+  /**
+   * Return store path related to tables
+   */
+  @Override public CarbonTablePath getCarbonTablePath(
+      String storeLocation, CarbonTableIdentifier tableIdentifier) {
+    return CarbonStorePath.getCarbonTablePath(storeLocation, tableIdentifier);
+  }
+
+  public static PathService getInstance() {
+    return pathService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
index 26a4778..317d44a 100644
--- a/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/update/CarbonUpdateUtil.java
@@ -22,8 +22,6 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.*;
 
-import org.apache.carbondata.common.iudprocessor.iuddata.BlockMappingVO;
-import org.apache.carbondata.common.iudprocessor.iuddata.RowCountDetailsVO;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.AbsoluteTableIdentifier;
@@ -31,10 +29,12 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.path.CarbonStorePath;
 import org.apache.carbondata.core.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import 
org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.datastorage.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.impl.FileFactory;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.update.data.BlockMappingVO;
+import org.apache.carbondata.core.update.data.RowCountDetailsVO;
 import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
 import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java 
b/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
deleted file mode 100644
index ad744c4..0000000
--- a/core/src/main/java/org/apache/carbondata/core/update/UpdateVO.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.update;
-
-import java.io.Serializable;
-
-/**
- * VO class for storing details related to Update operation.
- */
-public class UpdateVO implements Serializable {
-  private static final long serialVersionUID = 1L;
-
-  private Long factTimestamp;
-
-  private Long updateDeltaStartTimestamp;
-
-  private String segmentId;
-
-  public Long getLatestUpdateTimestamp() {
-    return latestUpdateTimestamp;
-  }
-
-  public void setLatestUpdateTimestamp(Long latestUpdateTimestamp) {
-    this.latestUpdateTimestamp = latestUpdateTimestamp;
-  }
-
-  private Long latestUpdateTimestamp;
-
-  public Long getFactTimestamp() {
-    return factTimestamp;
-  }
-
-  public void setFactTimestamp(Long factTimestamp) {
-    this.factTimestamp = factTimestamp;
-  }
-
-  public Long getUpdateDeltaStartTimestamp() {
-    return updateDeltaStartTimestamp;
-  }
-
-  public void setUpdateDeltaStartTimestamp(Long updateDeltaStartTimestamp) {
-    this.updateDeltaStartTimestamp = updateDeltaStartTimestamp;
-  }
-
-  @Override public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    UpdateVO updateVO = (UpdateVO) o;
-    if (factTimestamp != null ?
-        !factTimestamp.equals(updateVO.factTimestamp) :
-        updateVO.factTimestamp != null) {
-      return false;
-    }
-    if (updateDeltaStartTimestamp != null ?
-        !updateDeltaStartTimestamp.equals(updateVO.updateDeltaStartTimestamp) :
-        updateVO.updateDeltaStartTimestamp != null) {
-      return false;
-    }
-    return latestUpdateTimestamp != null ?
-        latestUpdateTimestamp.equals(updateVO.latestUpdateTimestamp) :
-        updateVO.latestUpdateTimestamp == null;
-
-  }
-
-  @Override public int hashCode() {
-    int result = factTimestamp != null ? factTimestamp.hashCode() : 0;
-    result = 31 * result + (updateDeltaStartTimestamp != null ?
-        updateDeltaStartTimestamp.hashCode() :
-        0);
-    result = 31 * result + (latestUpdateTimestamp != null ? 
latestUpdateTimestamp.hashCode() : 0);
-    return result;
-  }
-
-  /**
-   * This will return the update timestamp if its present or it will return 
the fact timestamp.
-   * @return
-   */
-  public Long getCreatedOrUpdatedTimeStamp() {
-    if (null == latestUpdateTimestamp) {
-      return factTimestamp;
-    }
-    return latestUpdateTimestamp;
-  }
-
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/update/data/BlockMappingVO.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/update/data/BlockMappingVO.java 
b/core/src/main/java/org/apache/carbondata/core/update/data/BlockMappingVO.java
new file mode 100644
index 0000000..244c883
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/update/data/BlockMappingVO.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.update.data;
+
+import java.util.Map;
+
+/**
+ * VO class to store the details of segment and block count , block and its 
row count.
+ */
+public class BlockMappingVO {
+
+  private Map<String, Long> blockRowCountMapping ;
+
+  private Map<String, Long> segmentNumberOfBlockMapping ;
+
+  private Map<String, RowCountDetailsVO> completeBlockRowDetailVO;
+
+  public void setCompleteBlockRowDetailVO(Map<String, RowCountDetailsVO> 
completeBlockRowDetailVO) {
+    this.completeBlockRowDetailVO = completeBlockRowDetailVO;
+  }
+
+  public Map<String, RowCountDetailsVO> getCompleteBlockRowDetailVO() {
+    return completeBlockRowDetailVO;
+  }
+
+  public Map<String, Long> getBlockRowCountMapping() {
+    return blockRowCountMapping;
+  }
+
+  public Map<String, Long> getSegmentNumberOfBlockMapping() {
+    return segmentNumberOfBlockMapping;
+  }
+
+  public BlockMappingVO(Map<String, Long> blockRowCountMapping,
+      Map<String, Long> segmentNumberOfBlockMapping) {
+    this.blockRowCountMapping = blockRowCountMapping;
+    this.segmentNumberOfBlockMapping = segmentNumberOfBlockMapping;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952cf517/core/src/main/java/org/apache/carbondata/core/update/data/BlockletDeleteDeltaCacheLoader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/update/data/BlockletDeleteDeltaCacheLoader.java
 
b/core/src/main/java/org/apache/carbondata/core/update/data/BlockletDeleteDeltaCacheLoader.java
new file mode 100644
index 0000000..2015663
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/update/data/BlockletDeleteDeltaCacheLoader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.update.data;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.datastore.DataRefNode;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
+
+/**
+ * This class is responsible for loading delete delta file cache based on
+ * blocklet id of a particular block
+ */
+public class BlockletDeleteDeltaCacheLoader implements 
DeleteDeltaCacheLoaderIntf {
+  private String blockletID;
+  private DataRefNode blockletNode;
+  private AbsoluteTableIdentifier absoluteIdentifier;
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(BlockletDeleteDeltaCacheLoader.class.getName());
+
+  public BlockletDeleteDeltaCacheLoader(String blockletID,
+       DataRefNode blockletNode, AbsoluteTableIdentifier absoluteIdentifier) {
+    this.blockletID = blockletID;
+    this.blockletNode = blockletNode;
+    this.absoluteIdentifier= absoluteIdentifier;
+  }
+
+  /**
+   * This method will load the delete delta cache based on blocklet id of 
particular block with
+   * the help of SegmentUpdateStatusManager.
+   */
+  public void loadDeleteDeltaFileDataToCache() {
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+        new SegmentUpdateStatusManager(absoluteIdentifier);
+    int[] deleteDeltaFileData = null;
+    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache = null;
+    if (null == blockletNode.getDeleteDeltaDataCache()) {
+      try {
+        deleteDeltaFileData = 
segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
+        deleteDeltaDataCache = new 
BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData,
+            segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, 
null));
+      } catch (Exception e) {
+        LOGGER.debug("Unable to retrieve delete delta files");
+      }
+    } else {
+      deleteDeltaDataCache = blockletNode.getDeleteDeltaDataCache();
+      // if already cache is present then validate the cache using timestamp
+      String cacheTimeStamp = segmentUpdateStatusManager
+          .getTimestampForRefreshCache(blockletID, 
deleteDeltaDataCache.getCacheTimeStamp());
+      if (null != cacheTimeStamp) {
+        try {
+          deleteDeltaFileData =
+              
segmentUpdateStatusManager.getDeleteDeltaDataFromAllFiles(blockletID);
+          deleteDeltaDataCache = new 
BlockletLevelDeleteDeltaDataCache(deleteDeltaFileData,
+              
segmentUpdateStatusManager.getTimestampForRefreshCache(blockletID, 
cacheTimeStamp));
+        } catch (Exception e) {
+          LOGGER.debug("Unable to retrieve delete delta files");
+        }
+      }
+    }
+    blockletNode.setDeleteDeltaDataCache(deleteDeltaDataCache);
+  }
+}


Reply via email to