Improve No dictionary column Include And Exclude filter
Fixed Data mismatch issue
Fixed Alter with Caps Decimal issue


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/357ab636
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/357ab636
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/357ab636

Branch: refs/heads/branch-1.1
Commit: 357ab636f7596d7e26bfd92657d708a659b9e718
Parents: bbcc487
Author: kumarvishal <kumarvishal.1...@gmail.com>
Authored: Wed May 31 15:49:54 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Jun 15 13:21:02 2017 +0530

----------------------------------------------------------------------
 .../schema/table/column/CarbonDimension.java    |   7 +
 .../carbondata/core/scan/filter/FilterUtil.java |  18 ++
 .../executer/ExcludeFilterExecuterImpl.java     | 120 +++++------
 .../executer/IncludeFilterExecuterImpl.java     |  89 ++++----
 .../executer/RangeValueFilterExecuterImpl.java  | 214 +++++++++++++------
 .../executer/RowLevelFilterExecuterImpl.java    |   5 +
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  82 ++++---
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  75 ++++---
 ...velRangeLessThanEqualFilterExecuterImpl.java | 106 +++++----
 .../RowLevelRangeLessThanFiterExecuterImpl.java | 113 ++++++----
 10 files changed, 503 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
index bdc7a4c..8d02512 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
@@ -122,6 +122,13 @@ public class CarbonDimension extends CarbonColumn {
   }
 
   /**
+   * @return is column participated in sorting or not
+   */
+  public boolean isSortColumn() {
+    return this.columnSchema.isSortColumn();
+  }
+
+  /**
    * to generate the hash code for this class
    */
   @Override public int hashCode() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 7799b6a..73387db 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -49,6 +49,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -1456,4 +1457,21 @@ public final class FilterUtil {
     return bitSetGroup;
   }
 
+  /**
+   * This method will compare the selected data against null values and
+   * flip the bitSet if any null value is found
+   *
+   * @param dimensionColumnDataChunk
+   * @param bitSet
+   */
+  public static void removeNullValues(DimensionColumnDataChunk 
dimensionColumnDataChunk,
+      BitSet bitSet, byte[] defaultValue) {
+    if (!bitSet.isEmpty()) {
+      for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) 
{
+        if (dimensionColumnDataChunk.compareTo(i, defaultValue) == 0) {
+          bitSet.flip(i);
+        }
+      }
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 7449781..23209ed 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -22,8 +22,6 @@ import java.util.BitSet;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
-import 
org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -35,7 +33,10 @@ public class ExcludeFilterExecuterImpl implements 
FilterExecuter {
   protected DimColumnResolvedFilterInfo dimColEvaluatorInfo;
   protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
   protected SegmentProperties segmentProperties;
-
+  /**
+   * is dimension column data is natural sorted
+   */
+  private boolean isNaturalSorted;
   public ExcludeFilterExecuterImpl(DimColumnResolvedFilterInfo 
dimColEvaluatorInfo,
       SegmentProperties segmentProperties) {
     this.dimColEvaluatorInfo = dimColEvaluatorInfo;
@@ -43,6 +44,8 @@ public class ExcludeFilterExecuterImpl implements 
FilterExecuter {
     this.segmentProperties = segmentProperties;
     
FilterUtil.prepareKeysFromSurrogates(dimColEvaluatorInfo.getFilterValues(), 
segmentProperties,
         dimColEvaluatorInfo.getDimension(), dimColumnExecuterInfo);
+    isNaturalSorted = dimColEvaluatorInfo.getDimension().isUseInvertedIndex() 
&& dimColEvaluatorInfo
+        .getDimension().isSortColumn();
   }
 
   @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) 
throws IOException {
@@ -69,96 +72,71 @@ public class ExcludeFilterExecuterImpl implements 
FilterExecuter {
 
   protected BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimColumnDataChunk,
       int numerOfRows) {
-    // For high cardinality dimensions.
-    if (dimColumnDataChunk.isNoDicitionaryColumn()
-        && dimColumnDataChunk instanceof VariableLengthDimensionDataChunk) {
-      return 
setDirectKeyFilterIndexToBitSet((VariableLengthDimensionDataChunk) 
dimColumnDataChunk,
-          numerOfRows);
-    }
-    if (dimColumnDataChunk.isExplicitSorted()
-        && dimColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      return setFilterdIndexToBitSetWithColumnIndex(
-          (FixedLengthDimensionDataChunk) dimColumnDataChunk, numerOfRows);
+    if (dimColumnDataChunk.isExplicitSorted()) {
+      return setFilterdIndexToBitSetWithColumnIndex(dimColumnDataChunk, 
numerOfRows);
     }
-    return setFilterdIndexToBitSet((FixedLengthDimensionDataChunk) 
dimColumnDataChunk, numerOfRows);
-  }
-
-  private BitSet setDirectKeyFilterIndexToBitSet(
-      VariableLengthDimensionDataChunk dimColumnDataChunk, int numerOfRows) {
-    BitSet bitSet = new BitSet(numerOfRows);
-    bitSet.flip(0, numerOfRows);
-    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
-    for (int i = 0; i < filterValues.length; i++) {
-      byte[] filterVal = filterValues[i];
-      if (dimColumnDataChunk.isExplicitSorted()) {
-        for (int index = 0; index < numerOfRows; index++) {
-          if (dimColumnDataChunk.compareTo(index, filterVal) == 0) {
-            bitSet.flip(dimColumnDataChunk.getInvertedIndex(index));
-          }
-        }
-      } else {
-        for (int index = 0; index < numerOfRows; index++) {
-          if (dimColumnDataChunk.compareTo(index, filterVal) == 0) {
-            bitSet.flip(index);
-          }
-        }
-      }
-    }
-    return bitSet;
-
+    return setFilterdIndexToBitSet(dimColumnDataChunk, numerOfRows);
   }
 
   private BitSet setFilterdIndexToBitSetWithColumnIndex(
-      FixedLengthDimensionDataChunk dimColumnDataChunk, int numerOfRows) {
-    int startKey = 0;
-    int last = 0;
-    int startIndex = 0;
+      DimensionColumnDataChunk dimensionColumnDataChunk, int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     bitSet.flip(0, numerOfRows);
+    int startIndex = 0;
     byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
     for (int i = 0; i < filterValues.length; i++) {
-      startKey = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimColumnDataChunk, startIndex, 
numerOfRows - 1,
-              filterValues[i], false);
-      if (startKey < 0) {
-        continue;
-      }
-      bitSet.flip(dimColumnDataChunk.getInvertedIndex(startKey));
-      last = startKey;
-      for (int j = startKey + 1; j < numerOfRows; j++) {
-        if (dimColumnDataChunk.compareTo(j, filterValues[i]) == 0) {
-          bitSet.flip(dimColumnDataChunk.getInvertedIndex(j));
-          last++;
-        } else {
-          break;
-        }
-      }
-      startIndex = last;
       if (startIndex >= numerOfRows) {
         break;
       }
+      int[] rangeIndex = CarbonUtil
+          .getRangeIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              filterValues[i]);
+      for (int j = rangeIndex[0]; j <= rangeIndex[1]; j++) {
+        bitSet.flip(dimensionColumnDataChunk.getInvertedIndex(j));
+      }
+      if (rangeIndex[1] >= 0) {
+        startIndex = rangeIndex[1] + 1;
+      }
     }
     return bitSet;
   }
 
-  // use binary search to replace for clause
-  private BitSet setFilterdIndexToBitSet(FixedLengthDimensionDataChunk 
dimColumnDataChunk,
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     bitSet.flip(0, numerOfRows);
     byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
-    if (filterValues.length > 1) {
-      for (int j = 0; j < numerOfRows; j++) {
-        int index = CarbonUtil.binarySearch(filterValues, 0, 
filterValues.length - 1,
-            dimColumnDataChunk.getChunkData(j));
-        if (index >= 0) {
+    // binary search can only be applied if column is sorted
+    if (isNaturalSorted) {
+      int startIndex = 0;
+      for (int i = 0; i < filterValues.length; i++) {
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+        int[] rangeIndex = CarbonUtil
+            .getRangeIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+                filterValues[i]);
+        for (int j = rangeIndex[0]; j <= rangeIndex[1]; j++) {
           bitSet.flip(j);
         }
+        if (rangeIndex[1] >= 0) {
+          startIndex = rangeIndex[1] + 1;
+        }
       }
-    } else if (filterValues.length == 1) {
-      for (int j = 0; j < numerOfRows; j++) {
-        if (dimColumnDataChunk.compareTo(j, filterValues[0]) == 0) {
-          bitSet.flip(j);
+    } else {
+      if (filterValues.length > 1) {
+        for (int i = 0; i < numerOfRows; i++) {
+          int index = CarbonUtil.binarySearch(filterValues, 0, 
filterValues.length - 1,
+              dimensionColumnDataChunk.getChunkData(i));
+          if (index >= 0) {
+            bitSet.flip(i);
+          }
+        }
+      } else {
+        for (int j = 0; j < numerOfRows; j++) {
+          if (dimensionColumnDataChunk.compareTo(j, filterValues[0]) == 0) {
+            bitSet.flip(j);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 7b8f084..8704496 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -22,8 +22,6 @@ import java.util.BitSet;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import 
org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
-import 
org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -36,6 +34,10 @@ public class IncludeFilterExecuterImpl implements 
FilterExecuter {
   protected DimColumnResolvedFilterInfo dimColumnEvaluatorInfo;
   protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
   protected SegmentProperties segmentProperties;
+  /**
+   * is dimension column data is natural sorted
+   */
+  private boolean isNaturalSorted;
 
   public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo 
dimColumnEvaluatorInfo,
       SegmentProperties segmentProperties) {
@@ -44,7 +46,9 @@ public class IncludeFilterExecuterImpl implements 
FilterExecuter {
     dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
     
FilterUtil.prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(),
         segmentProperties, dimColumnEvaluatorInfo.getDimension(), 
dimColumnExecuterInfo);
-
+    isNaturalSorted =
+        dimColumnEvaluatorInfo.getDimension().isUseInvertedIndex() && 
dimColumnEvaluatorInfo
+            .getDimension().isSortColumn();
   }
 
   @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) 
throws IOException {
@@ -76,58 +80,29 @@ public class IncludeFilterExecuterImpl implements 
FilterExecuter {
 
   protected BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows) {
-    if (dimensionColumnDataChunk.isNoDicitionaryColumn()
-        && dimensionColumnDataChunk instanceof 
VariableLengthDimensionDataChunk) {
-      return setDirectKeyFilterIndexToBitSet(
-          (VariableLengthDimensionDataChunk) dimensionColumnDataChunk, 
numerOfRows);
-    } else if (dimensionColumnDataChunk.isExplicitSorted()
-        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      return setFilterdIndexToBitSetWithColumnIndex(
-          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, 
numerOfRows);
+    if (dimensionColumnDataChunk.isExplicitSorted()) {
+      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, 
numerOfRows);
     }
-
     return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
   }
 
-  private BitSet setDirectKeyFilterIndexToBitSet(
-      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int 
numerOfRows) {
-    BitSet bitSet = new BitSet(numerOfRows);
-    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
-    for (int i = 0; i < filterValues.length; i++) {
-      byte[] filterVal = filterValues[i];
-      if (dimensionColumnDataChunk.isExplicitSorted()) {
-        for (int index = 0; index < numerOfRows; index++) {
-          if (dimensionColumnDataChunk.compareTo(index, filterVal) == 0) {
-            bitSet.set(dimensionColumnDataChunk.getInvertedIndex(index));
-          }
-        }
-      } else {
-        for (int index = 0; index < numerOfRows; index++) {
-          if (dimensionColumnDataChunk.compareTo(index, filterVal) == 0) {
-            bitSet.set(index);
-          }
-        }
-      }
-    }
-    return bitSet;
-
-  }
-
   private BitSet setFilterdIndexToBitSetWithColumnIndex(
-      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) 
{
+      DimensionColumnDataChunk dimensionColumnDataChunk, int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     int startIndex = 0;
     byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
     for (int i = 0; i < filterValues.length; i++) {
-      int[] rangeIndex = 
CarbonUtil.getRangeIndexUsingBinarySearch(dimensionColumnDataChunk,
-          startIndex, numerOfRows - 1, filterValues[i]);
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+      int[] rangeIndex = CarbonUtil
+          .getRangeIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+              filterValues[i]);
       for (int j = rangeIndex[0]; j <= rangeIndex[1]; j++) {
-
         bitSet.set(dimensionColumnDataChunk.getInvertedIndex(j));
       }
-
       if (rangeIndex[1] >= 0) {
-        startIndex = rangeIndex[1];
+        startIndex = rangeIndex[1] + 1;
       }
     }
     return bitSet;
@@ -136,8 +111,26 @@ public class IncludeFilterExecuterImpl implements 
FilterExecuter {
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    // binary search can only be applied if column is sorted and
+    // inverted index exists for that column
+    if (isNaturalSorted) {
+      int startIndex = 0;
+      for (int i = 0; i < filterValues.length; i++) {
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+        int[] rangeIndex = CarbonUtil
+            .getRangeIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+                filterValues[i]);
+        for (int j = rangeIndex[0]; j <= rangeIndex[1]; j++) {
+          bitSet.set(j);
+        }
+        if (rangeIndex[1] >= 0) {
+          startIndex = rangeIndex[1] + 1;
+        }
+      }
+    } else {
       if (filterValues.length > 1) {
         for (int i = 0; i < numerOfRows; i++) {
           int index = CarbonUtil.binarySearch(filterValues, 0, 
filterValues.length - 1,
@@ -146,10 +139,10 @@ public class IncludeFilterExecuterImpl implements 
FilterExecuter {
             bitSet.set(i);
           }
         }
-      } else if (filterValues.length == 1) {
-        for (int i = 0; i < numerOfRows; i++) {
-          if (dimensionColumnDataChunk.compareTo(i, filterValues[0]) == 0) {
-            bitSet.set(i);
+      } else {
+        for (int j = 0; j < numerOfRows; j++) {
+          if (dimensionColumnDataChunk.compareTo(j, filterValues[0]) == 0) {
+            bitSet.set(j);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index a20f414..6823531 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -24,7 +24,10 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.expression.Expression;
 import 
org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression;
@@ -48,8 +51,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
 public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl 
{
 
   private DimColumnResolvedFilterInfo dimColEvaluatorInfo;
-  private MeasureColumnResolvedFilterInfo msrColEvalutorInfo;
-  private AbsoluteTableIdentifier tableIdentifier;
   private Expression exp;
   private byte[][] filterRangesValues;
   private SegmentProperties segmentProperties;
@@ -78,10 +79,8 @@ public class RangeValueFilterExecuterImpl extends 
ValueBasedFilterExecuterImpl {
       SegmentProperties segmentProperties) {
 
     this.dimColEvaluatorInfo = dimColEvaluatorInfo;
-    this.msrColEvalutorInfo = msrColEvaluatorInfo;
     this.exp = exp;
     this.segmentProperties = segmentProperties;
-    this.tableIdentifier = tableIdentifier;
     this.filterRangesValues = filterRangeValues;
     this.lessThanExp = isLessThan();
     this.lessThanEqualExp = isLessThanEqualTo();
@@ -242,7 +241,7 @@ public class RangeValueFilterExecuterImpl extends 
ValueBasedFilterExecuterImpl {
     //                       Block Min <-----------------------> Block Max
     //         Filter Min <-----------------------------------------------> 
Filter Max
 
-    if (isDimensionPresentInCurrentBlock == true) {
+    if (isDimensionPresentInCurrentBlock) {
       if (((lessThanExp == true) && (
           ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockMinValue, 
filterValues[1]) >= 0)) || (
           (lessThanEqualExp == true) && (
@@ -474,80 +473,175 @@ public class RangeValueFilterExecuterImpl extends 
ValueBasedFilterExecuterImpl {
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     // if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-    int start = 0;
-    int startMin = 0;
-    int endMax = 0;
-    int startIndex = 0;
     byte[][] filterValues = this.filterRangesValues;
-    // For Range expression we expect two values. The First is the Min Value 
and Second is the
-    // Max value.
-    if (startBlockMinIsDefaultStart == false) {
-
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk,
-              startIndex, numerOfRows - 1, filterValues[0], greaterThanExp);
+    if (dimensionColumnDataChunk.isExplicitSorted()) {
+      int start = 0;
+      int startMin = 0;
+      int endMax = 0;
+      int startIndex = 0;
+      // For Range expression we expect two values. The First is the Min Value 
and Second is the
+      // Max value.
+      if (startBlockMinIsDefaultStart == false) {
 
-      if (greaterThanExp == true && start >= 0) {
         start = CarbonUtil
-            .nextGreaterValueToTarget(start, dimensionColumnDataChunk, 
filterValues[0],
-                numerOfRows);
-      }
+            .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+                filterValues[0], greaterThanExp);
 
-      if (start < 0) {
-        start = -(start + 1);
-        if (start == numerOfRows) {
-          start = start - 1;
+        if (greaterThanExp == true && start >= 0) {
+          start = CarbonUtil
+              .nextGreaterValueToTarget(start, dimensionColumnDataChunk, 
filterValues[0],
+                  numerOfRows);
         }
-        // Method will compare the tentative index value after binary search, 
this tentative
-        // index needs to be compared by the filter member if its >= filter 
then from that
-        // index the bitset will be considered for filtering process.
-        if ((ByteUtil.compare(filterValues[0], 
dimensionColumnDataChunk.getChunkData(start)))
-            > 0) {
-          start = start + 1;
+
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary 
search, this tentative
+          // index needs to be compared by the filter member if its >= filter 
then from that
+          // index the bitset will be considered for filtering process.
+          if ((ByteUtil.compare(filterValues[0], 
dimensionColumnDataChunk.getChunkData(start)))
+              > 0) {
+            start = start + 1;
+          }
         }
+        startMin = start;
+      } else {
+        startMin = startIndex;
       }
-      startMin = start;
-    } else {
-      startMin = startIndex;
-    }
-
-    if (endBlockMaxisDefaultEnd == false) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
-              filterValues[1], lessThanEqualExp);
 
-      if (lessThanExp == true && start >= 0) {
-        start =
-            CarbonUtil.nextLesserValueToTarget(start, 
dimensionColumnDataChunk, filterValues[1]);
-      }
+      if (endBlockMaxisDefaultEnd == false) {
+        start = CarbonUtil
+            .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
+                filterValues[1], lessThanEqualExp);
 
-      if (start < 0) {
-        start = -(start + 1);
-        if (start == numerOfRows) {
-          start = start - 1;
+        if (lessThanExp == true && start >= 0) {
+          start =
+              CarbonUtil.nextLesserValueToTarget(start, 
dimensionColumnDataChunk, filterValues[1]);
         }
-        // In case the start is less than 0, then positive value of start is 
pointing to the next
-        // value of the searched key. So move to the previous one.
-        if ((ByteUtil.compare(filterValues[1], 
dimensionColumnDataChunk.getChunkData(start))
-            < 0)) {
-          start = start - 1;
+
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // In case the start is less than 0, then positive value of start is 
pointing to the next
+          // value of the searched key. So move to the previous one.
+          if ((ByteUtil.compare(filterValues[1], 
dimensionColumnDataChunk.getChunkData(start))
+              < 0)) {
+            start = start - 1;
+          }
         }
+        endMax = start;
+      } else {
+        endMax = numerOfRows - 1;
+      }
+
+      for (int j = startMin; j <= endMax; j++) {
+        bitSet.set(j);
+      }
+
+      // Binary Search cannot be done on '@NU#LL$!", so need to check and 
compare for null on
+      // matching row.
+      if (dimensionColumnDataChunk.isNoDicitionaryColumn()) {
+        updateForNoDictionaryColumn(startMin, endMax, 
dimensionColumnDataChunk, bitSet);
       }
-      endMax = start;
     } else {
-      endMax = numerOfRows - 1;
+      byte[] defaultValue = null;
+      if 
(dimColEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        DirectDictionaryGenerator directDictionaryGenerator = 
DirectDictionaryKeyGeneratorFactory
+            
.getDirectDictionaryGenerator(dimColEvaluatorInfo.getDimension().getDataType());
+        int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 
1;
+        CarbonDimension currentBlockDimension =
+            segmentProperties.getDimensions().get(dimensionBlocksIndex);
+        defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
+            this.segmentProperties.getSortColumnsGenerator());
+      } else {
+        defaultValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      }
+      // evaluate result for lower range value first and then perform and 
operation in the
+      // upper range value in order to compute the final result
+      bitSet = 
evaluateGreaterThanFilterForUnsortedColumn(dimensionColumnDataChunk, 
filterValues[0],
+          numerOfRows);
+      BitSet upperRangeBitSet =
+          evaluateLessThanFilterForUnsortedColumn(dimensionColumnDataChunk, 
filterValues[1],
+              numerOfRows);
+      bitSet.and(upperRangeBitSet);
+      FilterUtil.removeNullValues(dimensionColumnDataChunk, bitSet, 
defaultValue);
     }
+    return bitSet;
+  }
 
-    for (int j = startMin; j <= endMax; j++) {
-      bitSet.set(j);
+  /**
+   * This method will compare the selected data against null values and
+   * flip the bitSet if any null value is found
+   *
+   * @param dimensionColumnDataChunk
+   * @param bitSet
+   */
+  private void removeNullValues(DimensionColumnDataChunk 
dimensionColumnDataChunk, BitSet bitSet) {
+    if (!bitSet.isEmpty()) {
+      for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) 
{
+        if (dimensionColumnDataChunk.compareTo(i, 
CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY)
+            == 0) {
+          bitSet.flip(i);
+        }
+      }
     }
+  }
 
-    // Binary Search cannot be done on '@NU#LL$!", so need to check and 
compare for null on
-    // matching row.
-    if (dimensionColumnDataChunk.isNoDicitionaryColumn()) {
-      updateForNoDictionaryColumn(startMin, endMax, dimensionColumnDataChunk, 
bitSet);
+  /**
+   * This method will evaluate the result for filter column based on the lower 
range value
+   *
+   * @param dimensionColumnDataChunk
+   * @param filterValue
+   * @param numberOfRows
+   * @return
+   */
+  private BitSet evaluateGreaterThanFilterForUnsortedColumn(
+      DimensionColumnDataChunk dimensionColumnDataChunk, byte[] filterValue, 
int numberOfRows) {
+    BitSet bitSet = new BitSet(numberOfRows);
+    if (greaterThanExp) {
+      for (int i = 0; i < numberOfRows; i++) {
+        if ((ByteUtil.compare(dimensionColumnDataChunk.getChunkData(i), 
filterValue) > 0)) {
+          bitSet.set(i);
+        }
+      }
+    } else if (greaterThanEqualExp) {
+      for (int i = 0; i < numberOfRows; i++) {
+        if ((ByteUtil.compare(dimensionColumnDataChunk.getChunkData(i), 
filterValue) >= 0)) {
+          bitSet.set(i);
+        }
+      }
     }
+    return bitSet;
+  }
 
+  /**
+   * This method will evaluate the result for filter column based on the upper 
range value
+   *
+   * @param dimensionColumnDataChunk
+   * @param filterValue
+   * @param numberOfRows
+   * @return
+   */
+  private BitSet evaluateLessThanFilterForUnsortedColumn(
+      DimensionColumnDataChunk dimensionColumnDataChunk, byte[] filterValue, 
int numberOfRows) {
+    BitSet bitSet = new BitSet(numberOfRows);
+    if (lessThanExp) {
+      for (int i = 0; i < numberOfRows; i++) {
+        if ((ByteUtil.compare(dimensionColumnDataChunk.getChunkData(i), 
filterValue) < 0)) {
+          bitSet.set(i);
+        }
+      }
+    } else if (lessThanEqualExp) {
+      for (int i = 0; i < numberOfRows; i++) {
+        if ((ByteUtil.compare(dimensionColumnDataChunk.getChunkData(i), 
filterValue) <= 0)) {
+          bitSet.set(i);
+        }
+      }
+    }
     return bitSet;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 470de89..a72d526 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -95,6 +95,11 @@ public class RowLevelFilterExecuterImpl implements 
FilterExecuter {
    */
   protected boolean[] isMeasurePresentInCurrentBlock;
 
+  /**
+   * is dimension column data is natural sorted
+   */
+  protected boolean isNaturalSorted;
+
   public RowLevelFilterExecuterImpl(List<DimColumnResolvedFilterInfo> 
dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
       AbsoluteTableIdentifier tableIdentifier, SegmentProperties 
segmentProperties,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 6f8651a..be82be7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -43,7 +44,6 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends 
RowLevelFilterExecute
    * flag to check whether default values is present in the filter value list
    */
   private boolean isDefaultValuePresentInFilter;
-
   public RowLevelRangeGrtThanFiterExecuterImpl(
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
@@ -52,6 +52,8 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends 
RowLevelFilterExecute
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
         null);
     this.filterRangeValues = filterRangeValues;
+    isNaturalSorted = 
dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
+        && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
     ifDefaultValueMatchesFilter();
   }
 
@@ -150,10 +152,17 @@ public class RowLevelRangeGrtThanFiterExecuterImpl 
extends RowLevelFilterExecute
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows) {
+    BitSet bitSet = null;
     if (dimensionColumnDataChunk.isExplicitSorted()) {
-      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, 
numerOfRows);
+      bitSet = 
setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, numerOfRows);
+    } else {
+      bitSet = setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+    }
+    if (dimensionColumnDataChunk.isNoDicitionaryColumn()) {
+      FilterUtil.removeNullValues(dimensionColumnDataChunk, bitSet,
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     }
-    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+    return bitSet;
   }
 
   /**
@@ -228,39 +237,50 @@ public class RowLevelRangeGrtThanFiterExecuterImpl 
extends RowLevelFilterExecute
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    int start = 0;
-    int last = 0;
-    int startIndex = 0;
     byte[][] filterValues = this.filterRangeValues;
-    for (int k = 0; k < filterValues.length; k++) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
-              filterValues[k], true);
-      if (start >= 0) {
+    // binary search can only be applied if column is sorted
+    if (isNaturalSorted) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      for (int k = 0; k < filterValues.length; k++) {
         start = CarbonUtil
-            .nextGreaterValueToTarget(start, dimensionColumnDataChunk, 
filterValues[k],
-                numerOfRows);
-      }
-      if (start < 0) {
-        start = -(start + 1);
-        if (start == numerOfRows) {
-          start = start - 1;
+            .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex,
+                numerOfRows - 1, filterValues[k], true);
+        if (start >= 0) {
+          start = CarbonUtil
+              .nextGreaterValueToTarget(start, dimensionColumnDataChunk, 
filterValues[k],
+                  numerOfRows);
         }
-        // Method will compare the tentative index value after binary search, 
this tentative
-        // index needs to be compared by the filter member if its > filter 
then from that
-        // index the bitset will be considered for filtering process.
-        if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) > 0) {
-          start = start + 1;
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary 
search, this tentative
+          // index needs to be compared by the filter member if its > filter 
then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) > 0) {
+            start = start + 1;
+          }
+        }
+        last = start;
+        for (int j = start; j < numerOfRows; j++) {
+          bitSet.set(j);
+          last++;
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
         }
       }
-      last = start;
-      for (int j = start; j < numerOfRows; j++) {
-        bitSet.set(j);
-        last++;
-      }
-      startIndex = last;
-      if (startIndex >= numerOfRows) {
-        break;
+    } else {
+      for (int k = 0; k < filterValues.length; k++) {
+        for (int i = 0; i < numerOfRows; i++) {
+          if (ByteUtil.compare(dimensionColumnDataChunk.getChunkData(i), 
filterValues[k]) > 0) {
+            bitSet.set(i);
+          }
+        }
       }
     }
     return bitSet;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index fbc9b30..53da6c5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -53,6 +54,8 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl 
extends RowLevelFilte
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
         null);
     this.filterRangeValues = filterRangeValues;
+    isNaturalSorted = 
dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
+        && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
     ifDefaultValueMatchesFilter();
   }
 
@@ -151,10 +154,17 @@ public class 
RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows) {
+    BitSet bitSet = null;
     if (dimensionColumnDataChunk.isExplicitSorted()) {
-      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, 
numerOfRows);
+      bitSet = 
setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, numerOfRows);
+    } else {
+      bitSet = setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+    }
+    if (dimensionColumnDataChunk.isNoDicitionaryColumn()) {
+      FilterUtil.removeNullValues(dimensionColumnDataChunk, bitSet,
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     }
-    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+    return bitSet;
   }
 
   /**
@@ -218,35 +228,46 @@ public class 
RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    int start = 0;
-    int last = 0;
-    int startIndex = 0;
     byte[][] filterValues = this.filterRangeValues;
-    for (int k = 0; k < filterValues.length; k++) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
-              filterValues[k], false);
-      if (start < 0) {
-        start = -(start + 1);
-        if (start == numerOfRows) {
-          start = start - 1;
-        }
-        // Method will compare the tentative index value after binary search, 
this tentative
-        // index needs to be compared by the filter member if its >= filter 
then from that
-        // index the bitset will be considered for filtering process.
-        if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) > 0) {
-          start = start + 1;
+    // binary search can only be applied if column is sorted
+    if (isNaturalSorted) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil
+            .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex,
+                numerOfRows - 1, filterValues[k], false);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary 
search, this tentative
+          // index needs to be compared by the filter member if its >= filter 
then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) > 0) {
+            start = start + 1;
+          }
         }
-      }
 
-      last = start;
-      for (int j = start; j < numerOfRows; j++) {
-        bitSet.set(j);
-        last++;
+        last = start;
+        for (int j = start; j < numerOfRows; j++) {
+          bitSet.set(j);
+          last++;
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
+        }
       }
-      startIndex = last;
-      if (startIndex >= numerOfRows) {
-        break;
+    } else {
+      for (int k = 0; k < filterValues.length; k++) {
+        for (int i = 0; i < numerOfRows; i++) {
+          if (ByteUtil.compare(dimensionColumnDataChunk.getChunkData(i), 
filterValues[k]) >= 0) {
+            bitSet.set(i);
+          }
+        }
       }
     }
     return bitSet;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 99f5700..d694960 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -45,7 +46,6 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl 
extends RowLevelFilter
    * flag to check whether default values is present in the filter value list
    */
   private boolean isDefaultValuePresentInFilter;
-
   public RowLevelRangeLessThanEqualFilterExecuterImpl(
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
@@ -55,6 +55,8 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl 
extends RowLevelFilter
         null);
     this.filterRangeValues = filterRangeValues;
     ifDefaultValueMatchesFilter();
+    isNaturalSorted = 
dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
+        && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
   }
 
   /**
@@ -153,13 +155,20 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl 
extends RowLevelFilter
       CarbonDimension currentBlockDimension =
           segmentProperties.getDimensions().get(dimensionBlocksIndex[0]);
       defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
-          this.segmentProperties.getDimensionKeyGenerator());
+          this.segmentProperties.getSortColumnsGenerator());
     }
+    BitSet bitSet = null;
     if (dimensionColumnDataChunk.isExplicitSorted()) {
-      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, 
numerOfRows,
+      bitSet = 
setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, numerOfRows,
           defaultValue);
+    } else {
+      bitSet = setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, 
defaultValue);
     }
-    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, 
defaultValue);
+    if (dimensionColumnDataChunk.isNoDicitionaryColumn()) {
+      FilterUtil.removeNullValues(dimensionColumnDataChunk, bitSet,
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+    }
+    return bitSet;
   }
 
   /**
@@ -242,51 +251,62 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl 
extends RowLevelFilter
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows, byte[] defaultValue) {
     BitSet bitSet = new BitSet(numerOfRows);
-    int start = 0;
-    int last = 0;
-    int startIndex = 0;
     byte[][] filterValues = this.filterRangeValues;
-    int skip = 0;
-    //find the number of default values to skip the null value in case of 
direct dictionary
-    if (null != defaultValue) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
-              defaultValue, true);
-      if (start < 0) {
-        skip = -(start + 1);
-        // end of block
-        if (skip == numerOfRows) {
-          return bitSet;
+    // binary search can only be applied if column is sorted
+    if (isNaturalSorted) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      int skip = 0;
+      //find the number of default values to skip the null value in case of 
direct dictionary
+      if (null != defaultValue) {
+        start = CarbonUtil
+            .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex,
+                numerOfRows - 1, defaultValue, true);
+        if (start < 0) {
+          skip = -(start + 1);
+          // end of block
+          if (skip == numerOfRows) {
+            return bitSet;
+          }
+        } else {
+          skip = start;
         }
-      } else {
-        skip = start;
+        startIndex = skip;
       }
-      startIndex = skip;
-    }
-    for (int k = 0; k < filterValues.length; k++) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
-              filterValues[k], true);
-      if (start < 0) {
-        start = -(start + 1);
-        if (start >= numerOfRows) {
-          start = start - 1;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil
+            .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex,
+                numerOfRows - 1, filterValues[k], true);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start >= numerOfRows) {
+            start = start - 1;
+          }
+          // When negative value of start is returned from 
getFirstIndexUsingBinarySearch the Start
+          // will be pointing to the next consecutive position. So compare it 
again and point to the
+          // previous value returned from getFirstIndexUsingBinarySearch.
+          if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) < 0) {
+            start = start - 1;
+          }
         }
-        // When negative value of start is returned from 
getFirstIndexUsingBinarySearch the Start
-        // will be pointing to the next consecutive position. So compare it 
again and point to the
-        // previous value returned from getFirstIndexUsingBinarySearch.
-        if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) < 0) {
-          start = start - 1;
+        last = start;
+        for (int j = start; j >= skip; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
         }
       }
-      last = start;
-      for (int j = start; j >= skip; j--) {
-        bitSet.set(j);
-        last--;
-      }
-      startIndex = last;
-      if (startIndex <= 0) {
-        break;
+    } else {
+      for (int k = 0; k < filterValues.length; k++) {
+        for (int i = 0; i < numerOfRows; i++) {
+          if (ByteUtil.compare(dimensionColumnDataChunk.getChunkData(i), 
filterValues[k]) <= 0) {
+            bitSet.set(i);
+          }
+        }
       }
     }
     return bitSet;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/357ab636/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 5bdf315..b3dd921 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
@@ -55,6 +56,8 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends 
RowLevelFilterExecut
         null);
     this.filterRangeValues = filterRangeValues;
     ifDefaultValueMatchesFilter();
+    isNaturalSorted = 
dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
+        && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
   }
 
   /**
@@ -153,13 +156,20 @@ public class RowLevelRangeLessThanFiterExecuterImpl 
extends RowLevelFilterExecut
       CarbonDimension currentBlockDimension =
           segmentProperties.getDimensions().get(dimensionBlocksIndex[0]);
       defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
-          this.segmentProperties.getDimensionKeyGenerator());
+          this.segmentProperties.getSortColumnsGenerator());
     }
+    BitSet bitSet = null;
     if (dimensionColumnDataChunk.isExplicitSorted()) {
-      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, 
numerOfRows,
+      bitSet = 
setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, numerOfRows,
           defaultValue);
+    } else {
+      bitSet = setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, 
defaultValue);
+    }
+    if (dimensionColumnDataChunk.isNoDicitionaryColumn()) {
+      FilterUtil.removeNullValues(dimensionColumnDataChunk, bitSet,
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     }
-    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, 
defaultValue);
+    return bitSet;
   }
 
   /**
@@ -251,56 +261,67 @@ public class RowLevelRangeLessThanFiterExecuterImpl 
extends RowLevelFilterExecut
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk 
dimensionColumnDataChunk,
       int numerOfRows, byte[] defaultValue) {
     BitSet bitSet = new BitSet(numerOfRows);
-    int start = 0;
-    int last = 0;
-    int startIndex = 0;
-    int skip = 0;
     byte[][] filterValues = this.filterRangeValues;
-    //find the number of default values to skip the null value in case of 
direct dictionary
-    if (null != defaultValue) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
-              defaultValue, false);
-      if (start < 0) {
-        skip = -(start + 1);
-        // end of block
-        if (skip == numerOfRows) {
-          return bitSet;
+    // binary search can only be applied if column is sorted
+    if (isNaturalSorted) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      int skip = 0;
+      //find the number of default values to skip the null value in case of 
direct dictionary
+      if (null != defaultValue) {
+        start = CarbonUtil
+            .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex,
+                numerOfRows - 1, defaultValue, false);
+        if (start < 0) {
+          skip = -(start + 1);
+          // end of block
+          if (skip == numerOfRows) {
+            return bitSet;
+          }
+        } else {
+          skip = start;
         }
-      } else {
-        skip = start;
+        startIndex = skip;
       }
-      startIndex = skip;
-    }
-    for (int k = 0; k < filterValues.length; k++) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex, numerOfRows - 1,
-              filterValues[k], false);
-      if (start >= 0) {
-        start =
-            CarbonUtil.nextLesserValueToTarget(start, 
dimensionColumnDataChunk, filterValues[k]);
-      }
-      if (start < 0) {
-        start = -(start + 1);
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil
+            .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, 
startIndex,
+                numerOfRows - 1, filterValues[k], false);
+        if (start >= 0) {
+          start =
+              CarbonUtil.nextLesserValueToTarget(start, 
dimensionColumnDataChunk, filterValues[k]);
+        }
+        if (start < 0) {
+          start = -(start + 1);
 
-        if (start >= numerOfRows) {
-          start = numerOfRows - 1;
+          if (start >= numerOfRows) {
+            start = numerOfRows - 1;
+          }
+          // When negative value of start is returned from 
getFirstIndexUsingBinarySearch the Start
+          // will be pointing to the next consecutive position. So compare it 
again and point to the
+          // previous value returned from getFirstIndexUsingBinarySearch.
+          if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) < 0) {
+            start = start - 1;
+          }
         }
-        // When negative value of start is returned from 
getFirstIndexUsingBinarySearch the Start
-        // will be pointing to the next consecutive position. So compare it 
again and point to the
-        // previous value returned from getFirstIndexUsingBinarySearch.
-        if (ByteUtil.compare(filterValues[k], 
dimensionColumnDataChunk.getChunkData(start)) < 0) {
-          start = start - 1;
+        last = start;
+        for (int j = start; j >= skip; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
         }
       }
-      last = start;
-      for (int j = start; j >= skip; j--) {
-        bitSet.set(j);
-        last--;
-      }
-      startIndex = last;
-      if (startIndex <= 0) {
-        break;
+    } else {
+      for (int k = 0; k < filterValues.length; k++) {
+        for (int i = 0; i < numerOfRows; i++) {
+          if (ByteUtil.compare(dimensionColumnDataChunk.getChunkData(i), 
filterValues[k]) < 0) {
+            bitSet.set(i);
+          }
+        }
       }
     }
     return bitSet;

Reply via email to