http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
new file mode 100644
index 0000000..5fc23b7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public class IncludeFilterExecuterImpl implements FilterExecuter {
+
+  protected DimColumnResolvedFilterInfo dimColumnEvaluatorInfo;
+  protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+  protected SegmentProperties segmentProperties;
+
+  public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo 
dimColumnEvaluatorInfo,
+      SegmentProperties segmentProperties) {
+    this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
+    this.segmentProperties = segmentProperties;
+    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+    
FilterUtil.prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(),
+        segmentProperties, dimColumnEvaluatorInfo.getDimension(), 
dimColumnExecuterInfo);
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) 
throws IOException {
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColumnEvaluatorInfo.getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = 
blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return 
getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  protected BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (dimensionColumnDataChunk.isNoDicitionaryColumn()
+        && dimensionColumnDataChunk instanceof 
VariableLengthDimensionDataChunk) {
+      return setDirectKeyFilterIndexToBitSet(
+          (VariableLengthDimensionDataChunk) dimensionColumnDataChunk, 
numerOfRows);
+    } else if (dimensionColumnDataChunk.isExplicitSorted()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
numerOfRows);
+    }
+
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  private BitSet setDirectKeyFilterIndexToBitSet(
+      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int 
numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    for (int i = 0; i < filterValues.length; i++) {
+      byte[] filterVal = filterValues[i];
+      if (dimensionColumnDataChunk.isExplicitSorted()) {
+        for (int index = 0; index < numerOfRows; index++) {
+          if (dimensionColumnDataChunk.compareTo(index, filterVal) == 0) {
+            bitSet.set(dimensionColumnDataChunk.getInvertedIndex(index));
+          }
+        }
+      } else {
+        for (int index = 0; index < numerOfRows; index++) {
+          if (dimensionColumnDataChunk.compareTo(index, filterVal) == 0) {
+            bitSet.set(index);
+          }
+        }
+      }
+    }
+    return bitSet;
+
+  }
+
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) 
{
+    BitSet bitSet = new BitSet(numerOfRows);
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              filterValues[i], false);
+      if (start < 0) {
+        continue;
+      }
+      bitSet.set(dimensionColumnDataChunk.getInvertedIndex(start));
+      last = start;
+      for (int j = start + 1; j < numerOfRows; j++) {
+        if (dimensionColumnDataChunk.compareTo(j, filterValues[i]) == 0) {
+          bitSet.set(dimensionColumnDataChunk.getInvertedIndex(j));
+          last++;
+        } else {
+          break;
+        }
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+      for (int k = 0; k < filterValues.length; k++) {
+        for (int j = 0; j < numerOfRows; j++) {
+          if (dimensionColumnDataChunk.compareTo(j, filterValues[k]) == 0) {
+            bitSet.set(j);
+          }
+        }
+      }
+    }
+    return bitSet;
+  }
+
+  public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
+    int blockIndex = 
segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], 
blkMaxVal[blockIndex]);
+      // and filter-min should be positive
+      int minCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], 
blkMinVal[blockIndex]);
+
+      // if any filter value is in range than this block needs to be
+      // scanned
+      if (maxCompare <= 0 && minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
new file mode 100644
index 0000000..73b6c6a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/OrFilterExecuterImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+
+public class OrFilterExecuterImpl implements FilterExecuter {
+
+  private FilterExecuter leftExecuter;
+  private FilterExecuter rightExecuter;
+
+  public OrFilterExecuterImpl(FilterExecuter leftExecuter, FilterExecuter 
rightExecuter) {
+    this.leftExecuter = leftExecuter;
+    this.rightExecuter = rightExecuter;
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException, IOException {
+    BitSet leftFilters = leftExecuter.applyFilter(blockChunkHolder);
+    BitSet rightFilters = rightExecuter.applyFilter(blockChunkHolder);
+    leftFilters.or(rightFilters);
+
+    return leftFilters;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
+    BitSet leftFilters = leftExecuter.isScanRequired(blockMaxValue, 
blockMinValue);
+    BitSet rightFilters = rightExecuter.isScanRequired(blockMaxValue, 
blockMinValue);
+    leftFilters.or(rightFilters);
+    return leftFilters;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureFilterExecuterImpl.java
new file mode 100644
index 0000000..cc16818
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureFilterExecuterImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+
+public class RestructureFilterExecuterImpl implements FilterExecuter {
+
+  DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+
+  public RestructureFilterExecuterImpl(DimColumnResolvedFilterInfo 
dimColumnResolvedFilterInfo,
+      SegmentProperties segmentProperties) {
+    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+    FilterUtil
+        
.prepareKeysFromSurrogates(dimColumnResolvedFilterInfo.getFilterValues(), 
segmentProperties,
+            dimColumnResolvedFilterInfo.getDimension(), dimColumnExecuterInfo);
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blocksChunkHolder) {
+    BitSet bitSet = new BitSet(blocksChunkHolder.getDataBlock().nodeSize());
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    if (null != filterValues && filterValues.length > 0) {
+      bitSet.set(0, blocksChunkHolder.getDataBlock().nodeSize());
+    }
+    return bitSet;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    bitSet.set(0);
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
new file mode 100644
index 0000000..90b8cbf
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import 
org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.expression.Expression;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+public class RowLevelFilterExecuterImpl implements FilterExecuter {
+
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(RowLevelFilterExecuterImpl.class.getName());
+  protected List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList;
+  protected List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList;
+  protected Expression exp;
+  protected AbsoluteTableIdentifier tableIdentifier;
+  protected SegmentProperties segmentProperties;
+  /**
+   * it has index at which given dimension is stored in file
+   */
+  private int[] blocksIndex;
+
+  private Map<Integer, GenericQueryType> complexDimensionInfoMap;
+
+  public RowLevelFilterExecuterImpl(List<DimColumnResolvedFilterInfo> 
dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
+      AbsoluteTableIdentifier tableIdentifier, SegmentProperties 
segmentProperties,
+      Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+    this.dimColEvaluatorInfoList = dimColEvaluatorInfoList;
+    this.segmentProperties = segmentProperties;
+    this.blocksIndex = new int[dimColEvaluatorInfoList.size()];
+    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
+      this.blocksIndex[i] = 
segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColEvaluatorInfoList.get(i).getColumnIndex());
+    }
+    if (null == msrColEvalutorInfoList) {
+      this.msrColEvalutorInfoList = new 
ArrayList<MeasureColumnResolvedFilterInfo>(20);
+    } else {
+      this.msrColEvalutorInfoList = msrColEvalutorInfoList;
+    }
+    this.exp = exp;
+    this.tableIdentifier = tableIdentifier;
+    this.complexDimensionInfoMap = complexDimensionInfoMap;
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException, IOException {
+    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = 
dimColEvaluatorInfoList.get(i);
+      if (dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.ARRAY
+          && dimColumnEvaluatorInfo.getDimension().getDataType() != 
DataType.STRUCT) {
+        if (null == blockChunkHolder.getDimensionDataChunk()[blocksIndex[i]]) {
+          blockChunkHolder.getDimensionDataChunk()[blocksIndex[i]] = 
blockChunkHolder.getDataBlock()
+              .getDimensionChunk(blockChunkHolder.getFileReader(), 
blocksIndex[i]);
+        }
+      } else {
+        GenericQueryType complexType = 
complexDimensionInfoMap.get(blocksIndex[i]);
+        complexType.fillRequiredBlockData(blockChunkHolder);
+      }
+    }
+
+    // CHECKSTYLE:OFF Approval No:Approval-V1R2C10_001
+    if (null != msrColEvalutorInfoList) {
+      for (MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo : 
msrColEvalutorInfoList) {
+        if (null == 
blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo
+            .getColumnIndex()]) {
+          
blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] =
+              
blockChunkHolder.getDataBlock().getMeasureChunk(blockChunkHolder.getFileReader(),
+                  msrColumnEvalutorInfo.getColumnIndex());
+        }
+      }
+    }
+    // CHECKSTYLE:ON
+
+    int numberOfRows = blockChunkHolder.getDataBlock().nodeSize();
+    BitSet set = new BitSet(numberOfRows);
+    RowIntf row = new RowImpl();
+    boolean invalidRowsPresent = false;
+    for (int index = 0; index < numberOfRows; index++) {
+      createRow(blockChunkHolder, row, index);
+      Boolean rslt = false;
+      try {
+        rslt = exp.evaluate(row).getBoolean();
+      }
+      // Any invalid member while evaluation shall be ignored, system will log 
the
+      // error only once since all rows the evaluation happens so inorder to 
avoid
+      // too much log inforation only once the log will be printed.
+      catch (FilterIllegalMemberException e) {
+        FilterUtil.logError(e, invalidRowsPresent);
+      }
+      if (null != rslt && rslt) {
+        set.set(index);
+      }
+    }
+    return set;
+  }
+
+  /**
+   * Method will read the members of particular dimension block and create
+   * a row instance for further processing of the filters
+   *
+   * @param blockChunkHolder
+   * @param row
+   * @param index
+   * @throws IOException
+   */
+  private void createRow(BlocksChunkHolder blockChunkHolder, RowIntf row, int 
index)
+      throws IOException {
+    Object[] record = new Object[dimColEvaluatorInfoList.size() + 
msrColEvalutorInfoList.size()];
+    String memberString;
+    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = 
dimColEvaluatorInfoList.get(i);
+      if (dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.ARRAY
+          && dimColumnEvaluatorInfo.getDimension().getDataType() != 
DataType.STRUCT) {
+        if (!dimColumnEvaluatorInfo.isDimensionExistsInCurrentSilce()) {
+          record[dimColumnEvaluatorInfo.getRowIndex()] = 
dimColumnEvaluatorInfo.getDefaultValue();
+        }
+        if 
(!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
+            && blockChunkHolder
+            .getDimensionDataChunk()[blocksIndex[i]] instanceof 
VariableLengthDimensionDataChunk) {
+
+          VariableLengthDimensionDataChunk dimensionColumnDataChunk =
+              (VariableLengthDimensionDataChunk) blockChunkHolder
+                  .getDimensionDataChunk()[blocksIndex[i]];
+          memberString = 
readMemberBasedOnNoDictionaryVal(dimensionColumnDataChunk, index);
+          if (null != memberString) {
+            if (memberString.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) 
{
+              memberString = null;
+            }
+            record[dimColumnEvaluatorInfo.getRowIndex()] = DataTypeUtil
+                .getDataBasedOnDataType(memberString,
+                    dimColumnEvaluatorInfo.getDimension().getDataType());
+          } else {
+            continue;
+          }
+        } else {
+          int dictionaryValue =
+              readSurrogatesFromColumnBlock(blockChunkHolder, index, 
dimColumnEvaluatorInfo,
+                  blocksIndex[i]);
+          if 
(dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
+              && 
!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) 
{
+            memberString =
+                
getFilterActualValueFromDictionaryValue(dimColumnEvaluatorInfo, 
dictionaryValue);
+            record[dimColumnEvaluatorInfo.getRowIndex()] = DataTypeUtil
+                .getDataBasedOnDataType(memberString,
+                    dimColumnEvaluatorInfo.getDimension().getDataType());
+          } else if (dimColumnEvaluatorInfo.getDimension()
+              .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+
+            Object member = 
getFilterActualValueFromDirectDictionaryValue(dimColumnEvaluatorInfo,
+                dictionaryValue);
+            record[dimColumnEvaluatorInfo.getRowIndex()] = member;
+          }
+        }
+      } else {
+        try {
+          GenericQueryType complexType = 
complexDimensionInfoMap.get(blocksIndex[i]);
+          ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+          DataOutputStream dataOutputStream = new DataOutputStream(byteStream);
+          complexType
+              
.parseBlocksAndReturnComplexColumnByteArray(blockChunkHolder.getDimensionDataChunk(),
+                  index, dataOutputStream);
+          record[dimColumnEvaluatorInfo.getRowIndex()] = complexType
+              
.getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+          byteStream.close();
+        } catch (IOException e) {
+          LOGGER.info(e.getMessage());
+        }
+      }
+    }
+
+    DataType msrType;
+
+    for (MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo : 
msrColEvalutorInfoList) {
+      switch (msrColumnEvalutorInfo.getType()) {
+        case INT:
+        case LONG:
+          msrType = DataType.LONG;
+          break;
+        case DECIMAL:
+          msrType = DataType.DECIMAL;
+          break;
+        default:
+          msrType = DataType.DOUBLE;
+      }
+      // if measure doesnt exist then set the default value.
+      Object msrValue;
+      switch (msrType) {
+        case INT:
+        case LONG:
+          msrValue = 
blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+              .getMeasureDataHolder().getReadableLongValueByIndex(index);
+          break;
+        case DECIMAL:
+          msrValue = 
blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+              .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+          break;
+        default:
+          msrValue = 
blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+              .getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+      }
+      record[msrColumnEvalutorInfo.getRowIndex()] =
+          
blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+              .getNullValueIndexHolder().getBitSet().get(index) ? null : 
msrValue;
+    }
+    row.setValues(record);
+  }
+
+  /**
+   * method will read the actual data from the direct dictionary generator
+   * by passing direct dictionary value.
+   *
+   * @param dimColumnEvaluatorInfo
+   * @param dictionaryValue
+   * @return
+   */
+  private Object getFilterActualValueFromDirectDictionaryValue(
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int dictionaryValue) 
{
+    Object memberString = null;
+    DirectDictionaryGenerator directDictionaryGenerator = 
DirectDictionaryKeyGeneratorFactory
+        
.getDirectDictionaryGenerator(dimColumnEvaluatorInfo.getDimension().getDataType());
+    if (null != directDictionaryGenerator) {
+      memberString = 
directDictionaryGenerator.getValueFromSurrogate(dictionaryValue);
+    }
+    return memberString;
+  }
+
+  /**
+   * Read the actual filter member by passing the dictionary value from
+   * the forward dictionary cache which which holds column wise cache
+   *
+   * @param dimColumnEvaluatorInfo
+   * @param dictionaryValue
+   * @return
+   * @throws IOException
+   */
+  private String getFilterActualValueFromDictionaryValue(
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int dictionaryValue) 
throws IOException {
+    String memberString;
+    Dictionary forwardDictionary = FilterUtil
+        .getForwardDictionaryCache(tableIdentifier, 
dimColumnEvaluatorInfo.getDimension());
+
+    memberString = forwardDictionary.getDictionaryValueForKey(dictionaryValue);
+    if (null != memberString) {
+      if (memberString.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+        memberString = null;
+      }
+    }
+    return memberString;
+  }
+
+  /**
+   * read the filter member dictionary data from the block corresponding to
+   * applied filter column
+   *
+   * @param blockChunkHolder
+   * @param index
+   * @param dimColumnEvaluatorInfo
+   * @return
+   */
+  private int readSurrogatesFromColumnBlock(BlocksChunkHolder 
blockChunkHolder, int index,
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int blockIndex) {
+    if (dimColumnEvaluatorInfo.getDimension().isColumnar()) {
+      byte[] rawData = 
blockChunkHolder.getDimensionDataChunk()[blockIndex].getChunkData(index);
+      ByteBuffer byteBuffer = 
ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      int dictionaryValue = CarbonUtil.getSurrogateKey(rawData, byteBuffer);
+      return dictionaryValue;
+    } else {
+      return readSurrogatesFromColumnGroupBlock(blockChunkHolder, index, 
dimColumnEvaluatorInfo,
+          blockIndex);
+    }
+
+  }
+
+  /**
+   * @param blockChunkHolder
+   * @param index
+   * @param dimColumnEvaluatorInfo
+   * @return read surrogate of given row of given column group dimension
+   */
+  private int readSurrogatesFromColumnGroupBlock(BlocksChunkHolder 
blockChunkHolder, int index,
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int blockIndex) {
+    try {
+      KeyStructureInfo keyStructureInfo =
+          QueryUtil.getKeyStructureInfo(segmentProperties, 
dimColumnEvaluatorInfo);
+      byte[] colData = 
blockChunkHolder.getDimensionDataChunk()[blockIndex].getChunkData(index);
+      long[] result = keyStructureInfo.getKeyGenerator().getKeyArray(colData);
+      int colGroupId =
+          QueryUtil.getColumnGroupId(segmentProperties, 
dimColumnEvaluatorInfo.getColumnIndex());
+      int dictionaryValue = (int) result[segmentProperties
+          .getColumnGroupMdKeyOrdinal(colGroupId, 
dimColumnEvaluatorInfo.getColumnIndex())];
+      return dictionaryValue;
+    } catch (KeyGenException e) {
+      LOGGER.error(e);
+    }
+    return 0;
+  }
+
+  /**
+   * Reading the blocks for no dictionary data, in no dictionary case
+   * directly the filter data will read, no need to scan the dictionary
+   * or read the dictionary value.
+   *
+   * @param dimensionColumnDataChunk
+   * @param index
+   * @return
+   */
+  private String readMemberBasedOnNoDictionaryVal(
+      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int index) {
+    return new String(dimensionColumnDataChunk.getChunkData(index),
+        Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    bitSet.set(0);
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
new file mode 100644
index 0000000..fb774ed
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.expression.Expression;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public class RowLevelRangeGrtThanFiterExecuterImpl extends 
RowLevelFilterExecuterImpl {
+  private byte[][] filterRangeValues;
+
+  public RowLevelRangeGrtThanFiterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = this.filterRangeValues;
+    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], 
blockMaxValue[columnIndex]);
+      // if any filter value is in range than this block needs to be
+      // scanned means always less than block max range.
+      if (maxCompare < 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException, IOException {
+    if 
(!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY))
 {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = 
blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return 
getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (dimensionColumnDataChunk.isExplicitSorted()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
numerOfRows);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all 
members
+   * will be considered for applying range filters. this method will be called 
if the
+   * column is not supported by default so column index mapping  will be 
present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) 
{
+    BitSet bitSet = new BitSet(numerOfRows);
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              filterValues[i], true);
+      if (start >= 0) {
+        start = CarbonUtil.nextGreaterValueToTarget(start, 
dimensionColumnDataChunk,
+            filterValues[i], numerOfRows);
+      }
+      // Logic will handle the case where the range filter member is not 
present in block
+      // in this case the binary search will return the index from where the 
bit sets will be
+      // set inorder to apply filters. this is greater than filter so the 
range will be taken
+      // from the next element which is greater than filter member.
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, 
this tentative
+        // index needs to be compared by the filter member if its > filter 
then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil.compare(filterValues[i],
+            
dimensionColumnDataChunk.getChunkData(dimensionColumnDataChunk.getInvertedIndex(start)))
+            > 0) {
+          start = start + 1;
+        }
+      }
+
+      last = start;
+      for (int j = start; j < numerOfRows; j++) {
+        bitSet.set(dimensionColumnDataChunk.getInvertedIndex(j));
+        last++;
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+            filterValues[k], true);
+        if(start >= 0){
+          start = CarbonUtil.nextGreaterValueToTarget(start,
+              (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
filterValues[k],
+              numerOfRows);
+        }
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary 
search, this tentative
+          // index needs to be compared by the filter member if its > filter 
then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) > 0) {
+            start = start + 1;
+          }
+        }
+        last = start;
+        for (int j = start; j < numerOfRows; j++) {
+          bitSet.set(j);
+          last++;
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
new file mode 100644
index 0000000..ca4dc57
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.expression.Expression;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends 
RowLevelFilterExecuterImpl {
+
+  protected byte[][] filterRangeValues;
+
+  public RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = this.filterRangeValues;
+    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], 
blockMaxValue[columnIndex]);
+      // if any filter value is in range than this block needs to be
+      // scanned less than equal to max range.
+      if (maxCompare <= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException, IOException {
+    if 
(!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY))
 {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = 
blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return 
getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (dimensionColumnDataChunk.isExplicitSorted()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
numerOfRows);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all 
members
+   * will be considered for applying range filters. this method will be called 
if the
+   * column is not supported by default so column index mapping  will be 
present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) 
{
+    BitSet bitSet = new BitSet(numerOfRows);
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              filterValues[i], false);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, 
this tentative
+        // index needs to be compared by the filter member if its >= filter 
then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil.compare(filterValues[i],
+            
dimensionColumnDataChunk.getChunkData(dimensionColumnDataChunk.getInvertedIndex(start)))
+            >= 0) {
+          start = start + 1;
+        }
+      }
+      last = start;
+      for (int j = start; j < numerOfRows; j++) {
+        bitSet.set(dimensionColumnDataChunk.getInvertedIndex(j));
+        last++;
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+            filterValues[k], false);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary 
search, this tentative
+          // index needs to be compared by the filter member if its >= filter 
then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start))
+              >= 0) {
+            start = start + 1;
+          }
+        }
+
+        last = start;
+        for (int j = start; j < numerOfRows; j++) {
+          bitSet.set(j);
+          last++;
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
new file mode 100644
index 0000000..838aec0
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.expression.Expression;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public class RowLevelRangeLessThanEqualFilterExecuterImpl extends 
RowLevelFilterExecuterImpl {
+  protected byte[][] filterRangeValues;
+
+  public RowLevelRangeLessThanEqualFilterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = this.filterRangeValues;
+    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // and filter-min should be positive
+      int minCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], 
blockMinValue[columnIndex]);
+
+      // if any filter applied is not in range of min and max of block
+      // then since its a less than equal to fiter validate whether the block
+      // min range is less than equal to applied filter member
+      if (minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException, IOException {
+    if 
(!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY))
 {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = 
blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return 
getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows) {
+    byte[] defaultValue = null;
+    if 
(dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY))
 {
+      DirectDictionaryGenerator directDictionaryGenerator = 
DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(
+              dimColEvaluatorInfoList.get(0).getDimension().getDataType());
+      int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1;
+      defaultValue = FilterUtil.getMaskKey(key, 
dimColEvaluatorInfoList.get(0).getDimension(),
+          this.segmentProperties.getDimensionKeyGenerator());
+    }
+    if (dimensionColumnDataChunk.isExplicitSorted()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
numerOfRows, defaultValue);
+
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, 
defaultValue);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all 
members
+   * will be considered for applying range filters. this method will be called 
if the
+   * column is not supported by default so column index mapping  will be 
present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows,
+      byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int start = 0;
+    int last = 0;
+    int skip = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    //find the number of default values to skip the null value in case of 
direct dictionary
+    if (null != defaultValue) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              defaultValue, true);
+      if (start < 0) {
+        skip = -(start + 1);
+        // end of block
+        if (skip == numerOfRows) {
+          return bitSet;
+        }
+      } else {
+        skip = start;
+      }
+      startIndex = skip;
+    }
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              filterValues[i], true);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, 
this tentative
+        // index needs to be compared by the filter member if its >= filter 
then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil.compare(filterValues[i],
+            
dimensionColumnDataChunk.getChunkData(dimensionColumnDataChunk.getInvertedIndex(start)))
+            <= 0) {
+          start = start - 1;
+        }
+      }
+      last = start;
+      for (int j = start; j >= skip; j--) {
+        bitSet.set(dimensionColumnDataChunk.getInvertedIndex(j));
+        last--;
+      }
+      startIndex = last;
+      if (startIndex <= 0) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @param defaultValue
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows, byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      int skip = 0;
+      //find the number of default values to skip the null value in case of 
direct dictionary
+      if (null != defaultValue) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+            defaultValue, true);
+        if (start < 0) {
+          skip = -(start + 1);
+          // end of block
+          if (skip == numerOfRows) {
+            return bitSet;
+          }
+        } else {
+          skip = start;
+        }
+        startIndex = skip;
+      }
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+            filterValues[k], true);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary 
search, this tentative
+          // index needs to be compared by the filter member if its <= filter 
then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start))
+              <= 0) {
+            start = start - 1;
+          }
+        }
+        last = start;
+        for (int j = start; j >= skip; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
new file mode 100644
index 0000000..96e8fbe
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.expression.Expression;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public class RowLevelRangeLessThanFiterExecuterImpl extends 
RowLevelFilterExecuterImpl {
+  private byte[][] filterRangeValues;
+
+  public RowLevelRangeLessThanFiterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] 
blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = this.filterRangeValues;
+    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // and filter-min should be positive
+      int minCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], 
blockMinValue[columnIndex]);
+
+      // if any filter applied is not in range of min and max of block
+      // then since its a less than fiter validate whether the block
+      // min range is less  than applied filter member
+      if (minCompare > 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException, IOException {
+    if 
(!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY))
 {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = 
blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return 
getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows) {
+    byte[] defaultValue = null;
+    if 
(dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY))
 {
+      DirectDictionaryGenerator directDictionaryGenerator = 
DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(
+              dimColEvaluatorInfoList.get(0).getDimension().getDataType());
+      int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1;
+      defaultValue = FilterUtil.getMaskKey(key, 
dimColEvaluatorInfoList.get(0).getDimension(),
+          this.segmentProperties.getDimensionKeyGenerator());
+    }
+    if (dimensionColumnDataChunk.isExplicitSorted()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
numerOfRows, defaultValue);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, 
defaultValue);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all 
members
+   * will be considered for applying range filters. this method will be called 
if the
+   * column is not supported by default so column index mapping  will be 
present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows,
+      byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    int skip = 0;
+    byte[][] filterValues = this.filterRangeValues;
+
+    //find the number of default values to skip the null value in case of 
direct dictionary
+    if (null != defaultValue) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              defaultValue, false);
+      if (start < 0) {
+        skip = -(start + 1);
+        // end of block
+        if (skip == numerOfRows) {
+          return bitSet;
+        }
+      } else {
+        skip = start;
+      }
+      startIndex = skip;
+    }
+
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              filterValues[i], false);
+      // Logic will handle the case where the range filter member is not 
present in block
+      // in this case the binary search will return the index from where the 
bit sets will be
+      // set inorder to apply filters. this is Lesser than filter so the range 
will be taken
+      // from the prev element which is Lesser than filter member.
+      start = CarbonUtil.nextLesserValueToTarget(start, 
dimensionColumnDataChunk, filterValues[i]);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, 
this tentative
+        // index needs to be compared by the filter member if its < filter 
then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil.compare(filterValues[i],
+            
dimensionColumnDataChunk.getChunkData(dimensionColumnDataChunk.getInvertedIndex(start)))
+            < 0) {
+          start = start - 1;
+        }
+      }
+      last = start;
+      for (int j = start; j >= skip; j--) {
+        bitSet.set(dimensionColumnDataChunk.getInvertedIndex(j));
+        last--;
+      }
+      startIndex = last;
+      if (startIndex >= 0) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      int numerOfRows, byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      int skip = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      //find the number of default values to skip the null value in case of 
direct dictionary
+      if (null != defaultValue) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+            defaultValue, false);
+        if (start < 0) {
+          skip = -(start + 1);
+          // end of block
+          if (skip == numerOfRows) {
+            return bitSet;
+          }
+        } else {
+          skip = start;
+        }
+        startIndex = skip;
+      }
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+            filterValues[k], false);
+        if(start >= 0) {
+          start = CarbonUtil.nextLesserValueToTarget(start,
+              (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
filterValues[k]);
+        }
+        if (start < 0) {
+          start = -(start + 1);
+
+          if (start >= numerOfRows) {
+            start = numerOfRows - 1;
+          }
+          // Method will compare the tentative index value after binary 
search, this tentative
+          // index needs to be compared by the filter member if its < filter 
then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) < 0) {
+            start = start - 1;
+          }
+        }
+        last = start;
+        for (int j = start; j >= skip; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
new file mode 100644
index 0000000..d5e58c9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
+
+public class RowLevelRangeTypeExecuterFacory {
+
+  private RowLevelRangeTypeExecuterFacory() {
+
+  }
+
+  /**
+   * The method returns the Row Level Range fiter type instance based on
+   * filter tree resolver type.
+   *
+   * @param filterExpressionResolverTree
+   * @param segmentProperties
+   * @return the generator instance
+   */
+  public static RowLevelFilterExecuterImpl getRowLevelRangeTypeExecuter(
+      FilterExecuterType filterExecuterType, FilterResolverIntf 
filterExpressionResolverTree,
+      SegmentProperties segmentProperties) {
+    switch (filterExecuterType) {
+
+      case ROWLEVEL_LESSTHAN:
+        return new RowLevelRangeLessThanFiterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            filterExpressionResolverTree.getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) 
filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties), segmentProperties);
+      case ROWLEVEL_LESSTHAN_EQUALTO:
+        return new RowLevelRangeLessThanEqualFilterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            filterExpressionResolverTree.getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) 
filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties), segmentProperties);
+      case ROWLEVEL_GREATERTHAN_EQUALTO:
+        return new RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            filterExpressionResolverTree.getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) 
filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties), segmentProperties);
+      case ROWLEVEL_GREATERTHAN:
+        return new RowLevelRangeGrtThanFiterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            filterExpressionResolverTree.getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) 
filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties), segmentProperties);
+      default:
+        // Scenario wont come logic must break
+        return null;
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
new file mode 100644
index 0000000..2ca481f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.intf;
+
+public enum ExpressionType {
+
+  AND,
+  OR,
+  NOT,
+  EQUALS,
+  NOT_EQUALS,
+  LESSTHAN,
+  LESSTHAN_EQUALTO,
+  GREATERTHAN,
+  GREATERTHAN_EQUALTO,
+  ADD,
+  SUBSTRACT,
+  DIVIDE,
+  MULTIPLY,
+  IN,
+  LIST,
+  NOT_IN,
+  UNKNOWN,
+  LITERAL,
+  RANGE,
+  FALSE
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java
new file mode 100644
index 0000000..eba0d8a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.scan.filter.intf;
+
+import java.io.Serializable;
+
+public enum FilterExecuterType implements Serializable {
+
+  INCLUDE, EXCLUDE, OR, AND, RESTRUCTURE, ROWLEVEL, RANGE, 
ROWLEVEL_GREATERTHAN,
+  ROWLEVEL_GREATERTHAN_EQUALTO, ROWLEVEL_LESSTHAN_EQUALTO, ROWLEVEL_LESSTHAN
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/RowImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/RowImpl.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/RowImpl.java
new file mode 100644
index 0000000..a13b140
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/RowImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.intf;
+
+public class RowImpl implements RowIntf {
+  private Object[] row;
+
+  public RowImpl() {
+    row = new Object[0];
+  }
+
+  @Override public Object getVal(int index) {
+    return row[index];
+  }
+
+  @Override public Object[] getValues() {
+    return row;
+  }
+
+  @Override public void setValues(final Object[] row) {
+    this.row = row;
+  }
+
+  @Override public int size() {
+    return this.row.length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ce09aaaf/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/RowIntf.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/RowIntf.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/RowIntf.java
new file mode 100644
index 0000000..7640553
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/RowIntf.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter.intf;
+
+public interface RowIntf {
+  Object getVal(int index);
+
+  Object[] getValues();
+
+  void setValues(Object[] setValues);
+
+  int size();
+
+}

Reply via email to