HBASE-15333 [hbase-spark] Enhance the dataframe filters to handle naively 
encoded short, integer, long, float and double (Zhan Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b353e388
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b353e388
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b353e388

Branch: refs/heads/HBASE-14850
Commit: b353e388bb6bf315818fcde81f11131d6d539c70
Parents: acca95f
Author: Jonathan M Hsieh <jmhs...@apache.org>
Authored: Fri May 13 06:58:17 2016 -0700
Committer: Jonathan M Hsieh <jmhs...@apache.org>
Committed: Fri May 13 07:00:41 2016 -0700

----------------------------------------------------------------------
 .../hbase/spark/SparkSQLPushDownFilter.java     |  19 +-
 .../spark/protobuf/generated/FilterProtos.java  | 176 ++++++-
 hbase-spark/src/main/protobuf/Filter.proto      |   1 +
 .../hadoop/hbase/spark/DefaultSource.scala      | 144 +++--
 .../hbase/spark/DynamicLogicExpression.scala    | 189 ++++---
 .../spark/datasources/HBaseSparkConf.scala      |   2 +
 .../spark/datasources/JavaBytesEncoder.scala    | 105 ++++
 .../hbase/spark/datasources/NaiveEncoder.scala  | 259 +++++++++
 .../hbase/spark/datasources/package.scala       |   2 +
 .../datasources/hbase/HBaseTableCatalog.scala   |   2 +-
 .../hadoop/hbase/spark/DefaultSourceSuite.scala |  15 +-
 .../spark/DynamicLogicExpressionSuite.scala     | 213 ++++++--
 .../hbase/spark/PartitionFilterSuite.scala      | 523 +++++++++++++++++++
 13 files changed, 1449 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
 
b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
index 057853f..071c1ca 100644
--- 
a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
+++ 
b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
@@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.spark.datasources.BytesEncoder;
+import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder;
 import org.apache.hadoop.hbase.spark.protobuf.generated.FilterProtos;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -56,21 +58,25 @@ public class SparkSQLPushDownFilter extends FilterBase{
   static final byte[] rowKeyFamily = new byte[0];
   static final byte[] rowKeyQualifier = Bytes.toBytes("key");
 
+  String encoderClassName;
+
   public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
                                 byte[][] valueFromQueryArray,
                                 HashMap<ByteArrayComparable,
                                         HashMap<ByteArrayComparable, String>>
-                                        currentCellToColumnIndexMap) {
+                                        currentCellToColumnIndexMap, String 
encoderClassName) {
     this.dynamicLogicExpression = dynamicLogicExpression;
     this.valueFromQueryArray = valueFromQueryArray;
     this.currentCellToColumnIndexMap = currentCellToColumnIndexMap;
+    this.encoderClassName = encoderClassName;
   }
 
   public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
                                 byte[][] valueFromQueryArray,
-                                MutableList<Field> fields) {
+                                MutableList<Field> fields, String 
encoderClassName) {
     this.dynamicLogicExpression = dynamicLogicExpression;
     this.valueFromQueryArray = valueFromQueryArray;
+    this.encoderClassName = encoderClassName;
 
     //generate family qualifier to index mapping
     this.currentCellToColumnIndexMap =
@@ -184,9 +190,12 @@ public class SparkSQLPushDownFilter extends FilterBase{
       throw new DeserializationException(e);
     }
 
+    String encoder = proto.getEncoderClassName();
+    BytesEncoder enc = JavaBytesEncoder.create(encoder);
+
     //Load DynamicLogicExpression
     DynamicLogicExpression dynamicLogicExpression =
-            
DynamicLogicExpressionBuilder.build(proto.getDynamicLogicExpression());
+            
DynamicLogicExpressionBuilder.build(proto.getDynamicLogicExpression(), enc);
 
     //Load valuesFromQuery
     final List<ByteString> valueFromQueryArrayList = 
proto.getValueFromQueryArrayList();
@@ -225,7 +234,7 @@ public class SparkSQLPushDownFilter extends FilterBase{
     }
 
     return new SparkSQLPushDownFilter(dynamicLogicExpression,
-            valueFromQueryArray, currentCellToColumnIndexMap);
+            valueFromQueryArray, currentCellToColumnIndexMap, encoder);
   }
 
   /**
@@ -256,6 +265,8 @@ public class SparkSQLPushDownFilter extends FilterBase{
         builder.addCellToColumnMapping(columnMappingBuilder.build());
       }
     }
+    builder.setEncoderClassName(encoderClassName);
+
 
     return builder.build().toByteArray();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java
 
b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java
index 1968d32..cbef134 100644
--- 
a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java
+++ 
b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java
@@ -783,6 +783,21 @@ public final class FilterProtos {
      */
     
org.apache.hadoop.hbase.spark.protobuf.generated.FilterProtos.SQLPredicatePushDownCellToColumnMappingOrBuilder
 getCellToColumnMappingOrBuilder(
         int index);
+
+    // optional string encoderClassName = 4;
+    /**
+     * <code>optional string encoderClassName = 4;</code>
+     */
+    boolean hasEncoderClassName();
+    /**
+     * <code>optional string encoderClassName = 4;</code>
+     */
+    java.lang.String getEncoderClassName();
+    /**
+     * <code>optional string encoderClassName = 4;</code>
+     */
+    com.google.protobuf.ByteString
+        getEncoderClassNameBytes();
   }
   /**
    * Protobuf type {@code hbase.pb.SQLPredicatePushDownFilter}
@@ -856,6 +871,11 @@ public final class FilterProtos {
               
cellToColumnMapping_.add(input.readMessage(org.apache.hadoop.hbase.spark.protobuf.generated.FilterProtos.SQLPredicatePushDownCellToColumnMapping.PARSER,
 extensionRegistry));
               break;
             }
+            case 34: {
+              bitField0_ |= 0x00000002;
+              encoderClassName_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1004,10 +1024,54 @@ public final class FilterProtos {
       return cellToColumnMapping_.get(index);
     }
 
+    // optional string encoderClassName = 4;
+    public static final int ENCODERCLASSNAME_FIELD_NUMBER = 4;
+    private java.lang.Object encoderClassName_;
+    /**
+     * <code>optional string encoderClassName = 4;</code>
+     */
+    public boolean hasEncoderClassName() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string encoderClassName = 4;</code>
+     */
+    public java.lang.String getEncoderClassName() {
+      java.lang.Object ref = encoderClassName_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          encoderClassName_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string encoderClassName = 4;</code>
+     */
+    public com.google.protobuf.ByteString
+        getEncoderClassNameBytes() {
+      java.lang.Object ref = encoderClassName_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        encoderClassName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       dynamicLogicExpression_ = "";
       valueFromQueryArray_ = java.util.Collections.emptyList();
       cellToColumnMapping_ = java.util.Collections.emptyList();
+      encoderClassName_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1040,6 +1104,9 @@ public final class FilterProtos {
       for (int i = 0; i < cellToColumnMapping_.size(); i++) {
         output.writeMessage(3, cellToColumnMapping_.get(i));
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(4, getEncoderClassNameBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1066,6 +1133,10 @@ public final class FilterProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(3, cellToColumnMapping_.get(i));
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getEncoderClassNameBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1098,6 +1169,11 @@ public final class FilterProtos {
           .equals(other.getValueFromQueryArrayList());
       result = result && getCellToColumnMappingList()
           .equals(other.getCellToColumnMappingList());
+      result = result && (hasEncoderClassName() == 
other.hasEncoderClassName());
+      if (hasEncoderClassName()) {
+        result = result && getEncoderClassName()
+            .equals(other.getEncoderClassName());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1123,6 +1199,10 @@ public final class FilterProtos {
         hash = (37 * hash) + CELL_TO_COLUMN_MAPPING_FIELD_NUMBER;
         hash = (53 * hash) + getCellToColumnMappingList().hashCode();
       }
+      if (hasEncoderClassName()) {
+        hash = (37 * hash) + ENCODERCLASSNAME_FIELD_NUMBER;
+        hash = (53 * hash) + getEncoderClassName().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1243,6 +1323,8 @@ public final class FilterProtos {
         } else {
           cellToColumnMappingBuilder_.clear();
         }
+        encoderClassName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -1289,6 +1371,10 @@ public final class FilterProtos {
         } else {
           result.cellToColumnMapping_ = cellToColumnMappingBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.encoderClassName_ = encoderClassName_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1346,6 +1432,11 @@ public final class FilterProtos {
             }
           }
         }
+        if (other.hasEncoderClassName()) {
+          bitField0_ |= 0x00000008;
+          encoderClassName_ = other.encoderClassName_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1769,6 +1860,80 @@ public final class FilterProtos {
         return cellToColumnMappingBuilder_;
       }
 
+      // optional string encoderClassName = 4;
+      private java.lang.Object encoderClassName_ = "";
+      /**
+       * <code>optional string encoderClassName = 4;</code>
+       */
+      public boolean hasEncoderClassName() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional string encoderClassName = 4;</code>
+       */
+      public java.lang.String getEncoderClassName() {
+        java.lang.Object ref = encoderClassName_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          encoderClassName_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string encoderClassName = 4;</code>
+       */
+      public com.google.protobuf.ByteString
+          getEncoderClassNameBytes() {
+        java.lang.Object ref = encoderClassName_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          encoderClassName_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string encoderClassName = 4;</code>
+       */
+      public Builder setEncoderClassName(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        encoderClassName_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string encoderClassName = 4;</code>
+       */
+      public Builder clearEncoderClassName() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        encoderClassName_ = getDefaultInstance().getEncoderClassName();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string encoderClassName = 4;</code>
+       */
+      public Builder setEncoderClassNameBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        encoderClassName_ = value;
+        onChanged();
+        return this;
+      }
+
       // 
@@protoc_insertion_point(builder_scope:hbase.pb.SQLPredicatePushDownFilter)
     }
 
@@ -1802,13 +1967,14 @@ public final class FilterProtos {
       "\n\014Filter.proto\022\010hbase.pb\"h\n\'SQLPredicate" +
       "PushDownCellToColumnMapping\022\025\n\rcolumn_fa" +
       "mily\030\001 \002(\014\022\021\n\tqualifier\030\002 
\002(\014\022\023\n\013column_" +
-      "name\030\003 \002(\t\"\261\001\n\032SQLPredicatePushDownFilte" +
+      "name\030\003 \002(\t\"\313\001\n\032SQLPredicatePushDownFilte" +
       "r\022 \n\030dynamic_logic_expression\030\001 \002(\t\022\036\n\026v" +
       "alue_from_query_array\030\002 \003(\014\022Q\n\026cell_to_c" +
       "olumn_mapping\030\003 \003(\01321.hbase.pb.SQLPredic" +
-      "atePushDownCellToColumnMappingBH\n0org.ap" +
-      "ache.hadoop.hbase.spark.protobuf.generat" +
-      "edB\014FilterProtosH\001\210\001\001\240\001\001"
+      "atePushDownCellToColumnMapping\022\030\n\020encode" +
+      "rClassName\030\004 \001(\tBH\n0org.apache.hadoop.hb" +
+      "ase.spark.protobuf.generatedB\014FilterProt",
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1826,7 +1992,7 @@ public final class FilterProtos {
           
internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor,
-              new java.lang.String[] { "DynamicLogicExpression", 
"ValueFromQueryArray", "CellToColumnMapping", });
+              new java.lang.String[] { "DynamicLogicExpression", 
"ValueFromQueryArray", "CellToColumnMapping", "EncoderClassName", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/protobuf/Filter.proto
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/protobuf/Filter.proto 
b/hbase-spark/src/main/protobuf/Filter.proto
index e076ce8..d17b48c 100644
--- a/hbase-spark/src/main/protobuf/Filter.proto
+++ b/hbase-spark/src/main/protobuf/Filter.proto
@@ -35,4 +35,5 @@ message SQLPredicatePushDownFilter {
   required string dynamic_logic_expression = 1;
   repeated bytes value_from_query_array = 2;
   repeated SQLPredicatePushDownCellToColumnMapping cell_to_column_mapping = 3;
+  optional string encoderClassName = 4;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index 1697036..0c29f50 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -23,9 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 import org.apache.hadoop.hbase.client._
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.mapred.TableOutputFormat
-import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf
-import org.apache.hadoop.hbase.spark.datasources.HBaseTableScanRDD
-import org.apache.hadoop.hbase.spark.datasources.SerializableConfiguration
+import org.apache.hadoop.hbase.spark.datasources._
 import org.apache.hadoop.hbase.types._
 import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, 
SimplePositionedMutableByteRange}
 import org.apache.hadoop.hbase._
@@ -92,6 +90,9 @@ case class HBaseRelation (
   val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
   val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
   val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
+  val encoderClsName = 
parameters.get(HBaseSparkConf.ENCODER).getOrElse(HBaseSparkConf.defaultEncoder)
+
+  @transient val encoder = JavaBytesEncoder.create(encoderClsName)
 
   val catalog = HBaseTableCatalog(parameters)
   def tableName = catalog.name
@@ -335,7 +336,7 @@ case class HBaseRelation (
 
     val pushDownFilterJava = if (usePushDownColumnFilter && 
pushDownDynamicLogicExpression != null) {
         Some(new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
-          valueArray, requiredQualifierDefinitionList))
+          valueArray, requiredQualifierDefinitionList, encoderClsName))
     } else {
       None
     }
@@ -402,11 +403,22 @@ case class HBaseRelation (
     (superRowKeyFilter, superDynamicLogicExpression, queryValueArray)
   }
 
+  /**
+    * For some codec, the order may be inconsistent between java primitive
+    * type and its byte array. We may have to  split the predicates on some
+    * of the java primitive type into multiple predicates. The encoder will 
take
+    * care of it and returning the concrete ranges.
+    *
+    * For example in naive codec,  some of the java primitive types have to be 
split into multiple
+    * predicates, and union these predicates together to make the predicates 
be performed correctly.
+    * For example, if we have "COLUMN < 2", we will transform it into
+    * "0 <= COLUMN < 2 OR Integer.MIN_VALUE <= COLUMN <= -1"
+    */
+
   def transverseFilterTree(parentRowKeyFilter:RowKeyFilter,
                                   valueArray:mutable.MutableList[Array[Byte]],
                                   filter:Filter): DynamicLogicExpression = {
     filter match {
-
       case EqualTo(attr, value) =>
         val field = catalog.getField(attr)
         if (field != null) {
@@ -420,18 +432,38 @@ case class HBaseRelation (
           valueArray += byteValue
         }
         new EqualLogicExpression(attr, valueArray.length - 1, false)
+
+      /**
+        * encoder may split the predicates into multiple byte array boundaries.
+        * Each boundaries is mapped into the RowKeyFilter and then is unioned 
by the reduce
+        * operation. If the data type is not supported, b will be None, and 
there is
+        * no operation happens on the parentRowKeyFilter.
+        *
+        * Note that because LessThan is not inclusive, thus the first bound 
should be exclusive,
+        * which is controlled by inc.
+        *
+        * The other predicates, i.e., 
GreaterThan/LessThanOrEqual/GreaterThanOrEqual follows
+        * the similar logic.
+        */
       case LessThan(attr, value) =>
         val field = catalog.getField(attr)
         if (field != null) {
           if (field.isRowKey) {
-            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
-              new ScanRange(DefaultSourceStaticUtils.getByteValue(field,
-                value.toString), false,
-                new Array[Byte](0), true)))
+            val b = encoder.ranges(value)
+            var inc = false
+            b.map(_.less.map { x =>
+              val r = new RowKeyFilter(null,
+                new ScanRange(x.upper, inc, x.low, true)
+              )
+              inc = true
+              r
+            }).map { x =>
+              x.reduce { (i, j) =>
+                i.mergeUnion(j)
+              }
+            }.map(parentRowKeyFilter.mergeIntersect(_))
           }
-          val byteValue =
-            DefaultSourceStaticUtils.getByteValue(catalog.getField(attr),
-              value.toString)
+          val byteValue = encoder.encode(field.dt, value)
           valueArray += byteValue
         }
         new LessThanLogicExpression(attr, valueArray.length - 1)
@@ -439,13 +471,20 @@ case class HBaseRelation (
         val field = catalog.getField(attr)
         if (field != null) {
           if (field.isRowKey) {
-            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
-              new ScanRange(null, true, 
DefaultSourceStaticUtils.getByteValue(field,
-                value.toString), false)))
+            val b = encoder.ranges(value)
+            var inc = false
+            b.map(_.greater.map{x =>
+              val r = new RowKeyFilter(null,
+                new ScanRange(x.upper, true, x.low, inc))
+              inc = true
+              r
+            }).map { x =>
+              x.reduce { (i, j) =>
+                i.mergeUnion(j)
+              }
+            }.map(parentRowKeyFilter.mergeIntersect(_))
           }
-          val byteValue =
-            DefaultSourceStaticUtils.getByteValue(field,
-              value.toString)
+          val byteValue = encoder.encode(field.dt, value)
           valueArray += byteValue
         }
         new GreaterThanLogicExpression(attr, valueArray.length - 1)
@@ -453,14 +492,17 @@ case class HBaseRelation (
         val field = catalog.getField(attr)
         if (field != null) {
           if (field.isRowKey) {
-            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
-              new ScanRange(DefaultSourceStaticUtils.getByteValue(field,
-                value.toString), true,
-                new Array[Byte](0), true)))
+            val b = encoder.ranges(value)
+            b.map(_.less.map(x =>
+              new RowKeyFilter(null,
+                new ScanRange(x.upper, true, x.low, true))))
+              .map { x =>
+                x.reduce{ (i, j) =>
+                  i.mergeUnion(j)
+                }
+              }.map(parentRowKeyFilter.mergeIntersect(_))
           }
-          val byteValue =
-            DefaultSourceStaticUtils.getByteValue(catalog.getField(attr),
-              value.toString)
+          val byteValue = encoder.encode(field.dt, value)
           valueArray += byteValue
         }
         new LessThanOrEqualLogicExpression(attr, valueArray.length - 1)
@@ -468,15 +510,18 @@ case class HBaseRelation (
         val field = catalog.getField(attr)
         if (field != null) {
           if (field.isRowKey) {
-            parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null,
-              new ScanRange(null, true, 
DefaultSourceStaticUtils.getByteValue(field,
-                value.toString), true)))
+            val b = encoder.ranges(value)
+            b.map(_.greater.map(x =>
+              new RowKeyFilter(null,
+                new ScanRange(x.upper, true, x.low, true))))
+              .map { x =>
+                x.reduce { (i, j) =>
+                  i.mergeUnion(j)
+                }
+              }.map(parentRowKeyFilter.mergeIntersect(_))
           }
-          val byteValue =
-            DefaultSourceStaticUtils.getByteValue(catalog.getField(attr),
-              value.toString)
+          val byteValue = encoder.encode(field.dt, value)
           valueArray += byteValue
-
         }
         new GreaterThanOrEqualLogicExpression(attr, valueArray.length - 1)
       case Or(left, right) =>
@@ -587,7 +632,7 @@ class ScanRange(var upperBound:Array[Byte], var 
isUpperBoundEqualTo:Boolean,
     var leftRange:ScanRange = null
     var rightRange:ScanRange = null
 
-    //First identify the Left range
+    // First identify the Left range
     // Also lower bound can't be null
     if (compareRange(lowerBound, other.lowerBound) < 0 ||
       compareRange(upperBound, other.upperBound) < 0) {
@@ -598,17 +643,34 @@ class ScanRange(var upperBound:Array[Byte], var 
isUpperBoundEqualTo:Boolean,
       rightRange = this
     }
 
-    //Then see if leftRange goes to null or if leftRange.upperBound
-    // upper is greater or equals to rightRange.lowerBound
-    if (leftRange.upperBound == null ||
-      Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0) {
-      new ScanRange(leftRange.upperBound, leftRange.isUpperBoundEqualTo, 
rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
+    if (hasOverlap(leftRange, rightRange)) {
+      // Find the upper bound and lower bound
+      if (compareRange(leftRange.upperBound, rightRange.upperBound) >= 0) {
+        new ScanRange(rightRange.upperBound, rightRange.isUpperBoundEqualTo,
+          rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
+      } else {
+        new ScanRange(leftRange.upperBound, leftRange.isUpperBoundEqualTo,
+          rightRange.lowerBound, rightRange.isLowerBoundEqualTo)
+      }
     } else {
       null
     }
   }
 
   /**
+    * The leftRange.upperBound has to be larger than the rightRange's 
lowerBound.
+    * Otherwise, there is no overlap.
+    *
+    * @param left: The range with the smaller lowBound
+    * @param right: The range with the larger lowBound
+    * @return Whether two ranges have overlap.
+    */
+
+  def hasOverlap(left: ScanRange, right: ScanRange): Boolean = {
+    compareRange(left.upperBound, right.lowerBound) >= 0
+  }
+
+  /**
    * Special compare logic because we can have null values
    * for left or right bound
    *
@@ -1046,7 +1108,7 @@ class RowKeyFilter (currentPoint:Array[Byte] = null,
    *
    * @param other Filter to merge
    */
-  def mergeUnion(other:RowKeyFilter): Unit = {
+  def mergeUnion(other:RowKeyFilter): RowKeyFilter = {
     other.points.foreach( p => points += p)
 
     other.ranges.foreach( otherR => {
@@ -1058,6 +1120,7 @@ class RowKeyFilter (currentPoint:Array[Byte] = null,
         }}
       if (!doesOverLap) ranges.+=(otherR)
     })
+    this
   }
 
   /**
@@ -1066,7 +1129,7 @@ class RowKeyFilter (currentPoint:Array[Byte] = null,
    *
    * @param other Filter to merge
    */
-  def mergeIntersect(other:RowKeyFilter): Unit = {
+  def mergeIntersect(other:RowKeyFilter): RowKeyFilter = {
     val survivingPoints = new mutable.MutableList[Array[Byte]]()
     val didntSurviveFirstPassPoints = new mutable.MutableList[Array[Byte]]()
     if (points == null || points.length == 0) {
@@ -1112,6 +1175,7 @@ class RowKeyFilter (currentPoint:Array[Byte] = null,
     }
     points = survivingPoints
     ranges = survivingRanges
+    this
   }
 
   override def toString:String = {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
index fa61860..1a1d478 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala
@@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.spark
 
 import java.util
 
+import org.apache.hadoop.hbase.spark.datasources.{BytesEncoder, 
JavaBytesEncoder}
+import 
org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
 import org.apache.hadoop.hbase.util.Bytes
-
+import org.apache.spark.sql.datasources.hbase.{Field, Utils}
+import org.apache.spark.sql.types._
 /**
  * Dynamic logic for SQL push down logic there is an instance for most
  * common operations and a pass through for other operations not covered here
@@ -38,7 +41,31 @@ trait DynamicLogicExpression {
     appendToExpression(strBuilder)
     strBuilder.toString()
   }
+  def filterOps: JavaBytesEncoder = JavaBytesEncoder.Unknown
+
   def appendToExpression(strBuilder:StringBuilder)
+
+  var encoder: BytesEncoder = _
+
+  def setEncoder(enc: BytesEncoder): DynamicLogicExpression = {
+    encoder = enc
+    this
+  }
+}
+
+trait CompareTrait {
+  self: DynamicLogicExpression =>
+  def columnName: String
+  def valueFromQueryIndex: Int
+  def execute(columnToCurrentRowValueMap:
+              util.HashMap[String, ByteArrayComparable],
+              valueFromQueryValueArray:Array[Array[Byte]]): Boolean = {
+    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
+    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
+    currentRowValue != null &&
+      encoder.filter(currentRowValue.bytes, currentRowValue.offset, 
currentRowValue.length,
+        valueFromQuery, 0, valueFromQuery.length, filterOps)
+  }
 }
 
 class AndLogicExpression (val leftExpression:DynamicLogicExpression,
@@ -113,59 +140,28 @@ class IsNullLogicExpression (val columnName:String,
   }
 }
 
-class GreaterThanLogicExpression (val columnName:String,
-                                  val valueFromQueryIndex:Int)
-  extends DynamicLogicExpression{
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = 
{
-    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
-    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
-
-    currentRowValue != null &&
-      Bytes.compareTo(currentRowValue.bytes,
-        currentRowValue.offset, currentRowValue.length, valueFromQuery,
-        0, valueFromQuery.length) > 0
-  }
+class GreaterThanLogicExpression (override val columnName:String,
+                                  override val valueFromQueryIndex:Int)
+  extends DynamicLogicExpression with CompareTrait{
+  override val filterOps = JavaBytesEncoder.Greater
   override def appendToExpression(strBuilder: StringBuilder): Unit = {
     strBuilder.append(columnName + " > " + valueFromQueryIndex)
   }
 }
 
-class GreaterThanOrEqualLogicExpression (val columnName:String,
-                                         val valueFromQueryIndex:Int)
-  extends DynamicLogicExpression{
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = 
{
-    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
-    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
-
-    currentRowValue != null &&
-      Bytes.compareTo(currentRowValue.bytes,
-        currentRowValue.offset, currentRowValue.length, valueFromQuery,
-        0, valueFromQuery.length) >= 0
-  }
+class GreaterThanOrEqualLogicExpression (override val columnName:String,
+                                         override val valueFromQueryIndex:Int)
+  extends DynamicLogicExpression with CompareTrait{
+  override val filterOps = JavaBytesEncoder.GreaterEqual
   override def appendToExpression(strBuilder: StringBuilder): Unit = {
     strBuilder.append(columnName + " >= " + valueFromQueryIndex)
   }
 }
 
-class LessThanLogicExpression (val columnName:String,
-                               val valueFromQueryIndex:Int)
-  extends DynamicLogicExpression{
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = 
{
-    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
-    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
-
-    currentRowValue != null &&
-      Bytes.compareTo(currentRowValue.bytes,
-        currentRowValue.offset, currentRowValue.length, valueFromQuery,
-        0, valueFromQuery.length) < 0
-  }
-
+class LessThanLogicExpression (override val columnName:String,
+                               override val valueFromQueryIndex:Int)
+  extends DynamicLogicExpression with CompareTrait {
+  override val filterOps = JavaBytesEncoder.Less
   override def appendToExpression(strBuilder: StringBuilder): Unit = {
     strBuilder.append(columnName + " < " + valueFromQueryIndex)
   }
@@ -173,19 +169,8 @@ class LessThanLogicExpression (val columnName:String,
 
 class LessThanOrEqualLogicExpression (val columnName:String,
                                       val valueFromQueryIndex:Int)
-  extends DynamicLogicExpression{
-  override def execute(columnToCurrentRowValueMap:
-                       util.HashMap[String, ByteArrayComparable],
-                       valueFromQueryValueArray:Array[Array[Byte]]): Boolean = 
{
-    val currentRowValue = columnToCurrentRowValueMap.get(columnName)
-    val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex)
-
-    currentRowValue != null &&
-      Bytes.compareTo(currentRowValue.bytes,
-        currentRowValue.offset, currentRowValue.length, valueFromQuery,
-        0, valueFromQuery.length) <= 0
-  }
-
+  extends DynamicLogicExpression with CompareTrait{
+  override val filterOps = JavaBytesEncoder.LessEqual
   override def appendToExpression(strBuilder: StringBuilder): Unit = {
     strBuilder.append(columnName + " <= " + valueFromQueryIndex)
   }
@@ -197,58 +182,66 @@ class PassThroughLogicExpression() extends 
DynamicLogicExpression {
                        valueFromQueryValueArray: Array[Array[Byte]]): Boolean 
= true
 
   override def appendToExpression(strBuilder: StringBuilder): Unit = {
-    strBuilder.append("Pass")
+    // Fix the offset bug by add dummy to avoid crash the region server.
+    // because in the DynamicLogicExpressionBuilder.build function, the 
command is always retrieved from offset + 1 as below
+    // val command = expressionArray(offSet + 1)
+    // we have to padding it so that `Pass` is on the right offset.
+    strBuilder.append("dummy Pass -1")
   }
 }
 
 object DynamicLogicExpressionBuilder {
-  def build(expressionString:String): DynamicLogicExpression = {
+  def build(expressionString: String, encoder: BytesEncoder): 
DynamicLogicExpression = {
 
-    val expressionAndOffset = build(expressionString.split(' '), 0)
+    val expressionAndOffset = build(expressionString.split(' '), 0, encoder)
     expressionAndOffset._1
   }
 
   private def build(expressionArray:Array[String],
-                    offSet:Int): (DynamicLogicExpression, Int) = {
-    if (expressionArray(offSet).equals("(")) {
-      val left = build(expressionArray, offSet + 1)
-      val right = build(expressionArray, left._2 + 1)
-      if (expressionArray(left._2).equals("AND")) {
-        (new AndLogicExpression(left._1, right._1), right._2 + 1)
-      } else if (expressionArray(left._2).equals("OR")) {
-        (new OrLogicExpression(left._1, right._1), right._2 + 1)
-      } else {
-        throw new Throwable("Unknown gate:" + expressionArray(left._2))
-      }
-    } else {
-      val command = expressionArray(offSet + 1)
-      if (command.equals("<")) {
-        (new LessThanLogicExpression(expressionArray(offSet),
-          expressionArray(offSet + 2).toInt), offSet + 3)
-      } else if (command.equals("<=")) {
-        (new LessThanOrEqualLogicExpression(expressionArray(offSet),
-          expressionArray(offSet + 2).toInt), offSet + 3)
-      } else if (command.equals(">")) {
-        (new GreaterThanLogicExpression(expressionArray(offSet),
-          expressionArray(offSet + 2).toInt), offSet + 3)
-      } else if (command.equals(">=")) {
-        (new GreaterThanOrEqualLogicExpression(expressionArray(offSet),
-          expressionArray(offSet + 2).toInt), offSet + 3)
-      } else if (command.equals("==")) {
-        (new EqualLogicExpression(expressionArray(offSet),
-          expressionArray(offSet + 2).toInt, false), offSet + 3)
-      } else if (command.equals("!=")) {
-        (new EqualLogicExpression(expressionArray(offSet),
-          expressionArray(offSet + 2).toInt, true), offSet + 3)
-      } else if (command.equals("isNull")) {
-        (new IsNullLogicExpression(expressionArray(offSet), false), offSet + 2)
-      } else if (command.equals("isNotNull")) {
-        (new IsNullLogicExpression(expressionArray(offSet), true), offSet + 2)
-      } else if (command.equals("Pass")) {
-        (new PassThroughLogicExpression, offSet + 2)
+                    offSet:Int, encoder: BytesEncoder): 
(DynamicLogicExpression, Int) = {
+    val expr = {
+      if (expressionArray(offSet).equals("(")) {
+        val left = build(expressionArray, offSet + 1, encoder)
+        val right = build(expressionArray, left._2 + 1, encoder)
+        if (expressionArray(left._2).equals("AND")) {
+          (new AndLogicExpression(left._1, right._1), right._2 + 1)
+        } else if (expressionArray(left._2).equals("OR")) {
+          (new OrLogicExpression(left._1, right._1), right._2 + 1)
+        } else {
+          throw new Throwable("Unknown gate:" + expressionArray(left._2))
+        }
       } else {
-        throw new Throwable("Unknown logic command:" + command)
+        val command = expressionArray(offSet + 1)
+        if (command.equals("<")) {
+          (new LessThanLogicExpression(expressionArray(offSet),
+            expressionArray(offSet + 2).toInt), offSet + 3)
+        } else if (command.equals("<=")) {
+          (new LessThanOrEqualLogicExpression(expressionArray(offSet),
+            expressionArray(offSet + 2).toInt), offSet + 3)
+        } else if (command.equals(">")) {
+          (new GreaterThanLogicExpression(expressionArray(offSet),
+            expressionArray(offSet + 2).toInt), offSet + 3)
+        } else if (command.equals(">=")) {
+          (new GreaterThanOrEqualLogicExpression(expressionArray(offSet),
+            expressionArray(offSet + 2).toInt), offSet + 3)
+        } else if (command.equals("==")) {
+          (new EqualLogicExpression(expressionArray(offSet),
+            expressionArray(offSet + 2).toInt, false), offSet + 3)
+        } else if (command.equals("!=")) {
+          (new EqualLogicExpression(expressionArray(offSet),
+            expressionArray(offSet + 2).toInt, true), offSet + 3)
+        } else if (command.equals("isNull")) {
+          (new IsNullLogicExpression(expressionArray(offSet), false), offSet + 
2)
+        } else if (command.equals("isNotNull")) {
+          (new IsNullLogicExpression(expressionArray(offSet), true), offSet + 
2)
+        } else if (command.equals("Pass")) {
+          (new PassThroughLogicExpression, offSet + 3)
+        } else {
+          throw new Throwable("Unknown logic command:" + command)
+        }
       }
     }
+    expr._1.setEncoder(encoder)
+    expr
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index be2af30..d9ee494 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -41,4 +41,6 @@ object HBaseSparkConf{
   val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp"
   val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp"
   val MAX_VERSIONS = "hbase.spark.query.maxVersions"
+  val ENCODER = "hbase.spark.query.encoder"
+  val defaultEncoder = classOf[NaiveEncoder].getCanonicalName
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
new file mode 100644
index 0000000..851fb66
--- /dev/null
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/JavaBytesEncoder.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark.datasources
+
+import 
org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Logging
+import org.apache.spark.sql.types._
+
+/**
+  * The ranges for the data type whose size is known. Whether the bound is 
inclusive
+  * or exclusive is undefind, and upper to the caller to decide.
+  *
+  * @param low: the lower bound of the range.
+  * @param upper: the upper bound of the range.
+  */
+case class BoundRange(low: Array[Byte],upper: Array[Byte])
+
+/**
+  * The class identifies the ranges for a java primitive type. The caller needs
+  * to decide the bound is either inclusive or exclusive on its own.
+  * information
+  *
+  * @param less: the set of ranges for LessThan/LessOrEqualThan
+  * @param greater: the set of ranges for GreaterThan/GreaterThanOrEqualTo
+  * @param value: the byte array of the original value
+  */
+case class BoundRanges(less: Array[BoundRange], greater: Array[BoundRange], 
value: Array[Byte])
+
+/**
+  * The trait to support plugin architecture for different encoder/decoder.
+  * encode is used for serializing the data type to byte array and the filter 
is
+  * used to filter out the unnecessary records.
+  */
+trait BytesEncoder {
+  def encode(dt: DataType, value: Any): Array[Byte]
+
+  /**
+    * The function performing real filtering operations. The format of 
filterBytes depends on the
+    * implementation of the BytesEncoder.
+    *
+    * @param input: the current input byte array that needs to be filtered out
+    * @param offset1: the starting offset of the input byte array.
+    * @param length1: the length of the input byte array.
+    * @param filterBytes: the byte array provided by query condition.
+    * @param offset2: the starting offset in the filterBytes.
+    * @param length2: the length of the bytes in the filterBytes
+    * @param ops: The operation of the filter operator.
+    * @return true: the record satisfies the predicates
+    *         false: the record does not satisfy the predicates.
+    */
+  def filter(input: Array[Byte], offset1: Int, length1: Int,
+             filterBytes: Array[Byte], offset2: Int, length2: Int,
+             ops: JavaBytesEncoder): Boolean
+
+  /**
+    * Currently, it is used for partition pruning.
+    * As for some codec, the order may be inconsistent between java primitive
+    * type and its byte array. We may have to  split the predicates on some
+    * of the java primitive type into multiple predicates.
+    *
+    * For example in naive codec,  some of the java primitive types have to be
+    * split into multiple predicates, and union these predicates together to
+    * make the predicates be performed correctly.
+    * For example, if we have "COLUMN < 2", we will transform it into
+    * "0 <= COLUMN < 2 OR Integer.MIN_VALUE <= COLUMN <= -1"
+    */
+  def ranges(in: Any): Option[BoundRanges]
+}
+
+object JavaBytesEncoder extends Enumeration with Logging{
+  type JavaBytesEncoder = Value
+  val Greater, GreaterEqual, Less, LessEqual, Equal, Unknown = Value
+
+  /**
+    * create the encoder/decoder
+    *
+    * @param clsName: the class name of the encoder/decoder class
+    * @return the instance of the encoder plugin.
+    */
+  def create(clsName: String): BytesEncoder = {
+    try {
+      Class.forName(clsName).newInstance.asInstanceOf[BytesEncoder]
+    } catch {
+      case _: Throwable =>
+        logWarning(s"$clsName cannot be initiated, falling back to naive 
encoder")
+        new NaiveEncoder()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
new file mode 100644
index 0000000..3137717
--- /dev/null
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/NaiveEncoder.scala
@@ -0,0 +1,259 @@
+package org.apache.hadoop.hbase.spark.datasources
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import 
org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder.JavaBytesEncoder
+import org.apache.hadoop.hbase.spark.hbase._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Logging
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+  * This is the naive non-order preserving encoder/decoder.
+  * Due to the inconsistency of the order between java primitive types
+  * and their bytearray. The data type has to be passed in so that the filter
+  * can work correctly, which is done by wrapping the type into the first byte
+  * of the serialized array.
+  */
+class NaiveEncoder extends BytesEncoder with Logging{
+  var code = 0
+  def nextCode: Byte = {
+    code += 1
+    (code - 1).asInstanceOf[Byte]
+  }
+  val BooleanEnc = nextCode
+  val ShortEnc = nextCode
+  val IntEnc = nextCode
+  val LongEnc = nextCode
+  val FloatEnc = nextCode
+  val DoubleEnc = nextCode
+  val StringEnc = nextCode
+  val BinaryEnc = nextCode
+  val TimestampEnc = nextCode
+  val UnknownEnc = nextCode
+
+
+  /**
+    * Evaluate the java primitive type and return the BoundRanges. For one 
value, it may have
+    * multiple output ranges because of the inconsistency of order between 
java primitive type
+    * and its byte array order.
+    *
+    * For short, integer, and long, the order of number is consistent with 
byte array order
+    * if two number has the same sign bit. But the negative number is larger 
than positive
+    * number in byte array.
+    *
+    * For double and float, the order of positive number is consistent with 
its byte array order.
+    * But the order of negative number is the reverse order of byte array. 
Please refer to IEEE-754
+    * and https://en.wikipedia.org/wiki/Single-precision_floating-point_format
+    */
+  def ranges(in: Any): Option[BoundRanges] = in match {
+    case a: Integer =>
+      val b =  Bytes.toBytes(a)
+      if (a >= 0) {
+        logDebug(s"range is 0 to $a and ${Integer.MIN_VALUE} to -1")
+        Some(BoundRanges(
+          Array(BoundRange(Bytes.toBytes(0: Int), b),
+            BoundRange(Bytes.toBytes(Integer.MIN_VALUE),  Bytes.toBytes(-1: 
Int))),
+          Array(BoundRange(b,  Bytes.toBytes(Integer.MAX_VALUE))), b))
+      } else {
+        Some(BoundRanges(
+          Array(BoundRange(Bytes.toBytes(Integer.MIN_VALUE), b)),
+          Array(BoundRange(b, Bytes.toBytes(-1: Integer)),
+            BoundRange(Bytes.toBytes(0: Int), 
Bytes.toBytes(Integer.MAX_VALUE))), b))
+      }
+    case a: Long =>
+      val b =  Bytes.toBytes(a)
+      if (a >= 0) {
+        Some(BoundRanges(
+          Array(BoundRange(Bytes.toBytes(0: Long), b),
+            BoundRange(Bytes.toBytes(Long.MinValue),  Bytes.toBytes(-1: 
Long))),
+          Array(BoundRange(b,  Bytes.toBytes(Long.MaxValue))), b))
+      } else {
+        Some(BoundRanges(
+          Array(BoundRange(Bytes.toBytes(Long.MinValue), b)),
+          Array(BoundRange(b, Bytes.toBytes(-1: Long)),
+            BoundRange(Bytes.toBytes(0: Long), Bytes.toBytes(Long.MaxValue))), 
b))
+      }
+    case a: Short =>
+      val b =  Bytes.toBytes(a)
+      if (a >= 0) {
+        Some(BoundRanges(
+          Array(BoundRange(Bytes.toBytes(0: Short), b),
+            BoundRange(Bytes.toBytes(Short.MinValue),  Bytes.toBytes(-1: 
Short))),
+          Array(BoundRange(b,  Bytes.toBytes(Short.MaxValue))), b))
+      } else {
+        Some(BoundRanges(
+          Array(BoundRange(Bytes.toBytes(Short.MinValue), b)),
+          Array(BoundRange(b, Bytes.toBytes(-1: Short)),
+            BoundRange(Bytes.toBytes(0: Short), 
Bytes.toBytes(Short.MaxValue))), b))
+      }
+    case a: Double =>
+      val b =  Bytes.toBytes(a)
+      if (a >= 0.0f) {
+        Some(BoundRanges(
+          Array(BoundRange(Bytes.toBytes(0.0d), b),
+            BoundRange(Bytes.toBytes(-0.0d),  Bytes.toBytes(Double.MinValue))),
+          Array(BoundRange(b,  Bytes.toBytes(Double.MaxValue))), b))
+      } else {
+        Some(BoundRanges(
+          Array(BoundRange(b, Bytes.toBytes(Double.MinValue))),
+          Array(BoundRange(Bytes.toBytes(-0.0d), b),
+            BoundRange(Bytes.toBytes(0.0d), Bytes.toBytes(Double.MaxValue))), 
b))
+      }
+    case a: Float =>
+      val b =  Bytes.toBytes(a)
+      if (a >= 0.0f) {
+        Some(BoundRanges(
+          Array(BoundRange(Bytes.toBytes(0.0f), b),
+            BoundRange(Bytes.toBytes(-0.0f),  Bytes.toBytes(Float.MinValue))),
+          Array(BoundRange(b,  Bytes.toBytes(Float.MaxValue))), b))
+      } else {
+        Some(BoundRanges(
+          Array(BoundRange(b, Bytes.toBytes(Float.MinValue))),
+          Array(BoundRange(Bytes.toBytes(-0.0f), b),
+            BoundRange(Bytes.toBytes(0.0f), Bytes.toBytes(Float.MaxValue))), 
b))
+      }
+    case a: Array[Byte] =>
+      Some(BoundRanges(
+        Array(BoundRange(bytesMin, a)),
+        Array(BoundRange(a, bytesMax)), a))
+    case a: Byte =>
+      val b =  Array(a)
+      Some(BoundRanges(
+        Array(BoundRange(bytesMin, b)),
+        Array(BoundRange(b, bytesMax)), b))
+    case a: String =>
+      val b =  Bytes.toBytes(a)
+      Some(BoundRanges(
+        Array(BoundRange(bytesMin, b)),
+        Array(BoundRange(b, bytesMax)), b))
+    case a: UTF8String =>
+      val b = a.getBytes
+      Some(BoundRanges(
+        Array(BoundRange(bytesMin, b)),
+        Array(BoundRange(b, bytesMax)), b))
+    case _ => None
+  }
+
+  def compare(c: Int, ops: JavaBytesEncoder): Boolean = {
+    ops match {
+      case JavaBytesEncoder.Greater =>  c > 0
+      case JavaBytesEncoder.GreaterEqual =>  c >= 0
+      case JavaBytesEncoder.Less =>  c < 0
+      case JavaBytesEncoder.LessEqual =>  c <= 0
+    }
+  }
+
+  /**
+    * encode the data type into byte array. Note that it is a naive 
implementation with the
+    * data type byte appending to the head of the serialized byte array.
+    *
+    * @param dt: The data type of the input
+    * @param value: the value of the input
+    * @return the byte array with the first byte indicating the data type.
+    */
+  override def encode(dt: DataType,
+                      value: Any): Array[Byte] = {
+    dt match {
+      case BooleanType =>
+        val result = new Array[Byte](Bytes.SIZEOF_BOOLEAN + 1)
+        result(0) = BooleanEnc
+        value.asInstanceOf[Boolean] match {
+          case true => result(1) = -1: Byte
+          case false => result(1) = 0: Byte
+        }
+        result
+      case ShortType =>
+        val result = new Array[Byte](Bytes.SIZEOF_SHORT + 1)
+        result(0) = ShortEnc
+        Bytes.putShort(result, 1, value.asInstanceOf[Short])
+        result
+      case IntegerType =>
+        val result = new Array[Byte](Bytes.SIZEOF_INT + 1)
+        result(0) = IntEnc
+        Bytes.putInt(result, 1, value.asInstanceOf[Int])
+        result
+      case LongType|TimestampType =>
+        val result = new Array[Byte](Bytes.SIZEOF_LONG + 1)
+        result(0) = LongEnc
+        Bytes.putLong(result, 1, value.asInstanceOf[Long])
+        result
+      case FloatType =>
+        val result = new Array[Byte](Bytes.SIZEOF_FLOAT + 1)
+        result(0) = FloatEnc
+        Bytes.putFloat(result, 1, value.asInstanceOf[Float])
+        result
+      case DoubleType =>
+        val result = new Array[Byte](Bytes.SIZEOF_DOUBLE + 1)
+        result(0) = DoubleEnc
+        Bytes.putDouble(result, 1, value.asInstanceOf[Double])
+        result
+      case BinaryType =>
+        val v = value.asInstanceOf[Array[Bytes]]
+        val result = new Array[Byte](v.length + 1)
+        result(0) = BinaryEnc
+        System.arraycopy(v, 0, result, 1, v.length)
+        result
+      case StringType =>
+        val bytes = Bytes.toBytes(value.asInstanceOf[String])
+        val result = new Array[Byte](bytes.length + 1)
+        result(0) = StringEnc
+        System.arraycopy(bytes, 0, result, 1, bytes.length)
+        result
+      case _ =>
+        val bytes = Bytes.toBytes(value.toString)
+        val result = new Array[Byte](bytes.length + 1)
+        result(0) = UnknownEnc
+        System.arraycopy(bytes, 0, result, 1, bytes.length)
+        result
+    }
+  }
+
+  override def filter(input: Array[Byte], offset1: Int, length1: Int,
+                      filterBytes: Array[Byte], offset2: Int, length2: Int,
+                      ops: JavaBytesEncoder): Boolean = {
+    filterBytes(offset2) match {
+      case ShortEnc =>
+        val in = Bytes.toShort(input, offset1)
+        val value = Bytes.toShort(filterBytes, offset2 + 1)
+        compare(in.compareTo(value), ops)
+      case IntEnc =>
+        val in = Bytes.toInt(input, offset1)
+        val value = Bytes.toInt(filterBytes, offset2 + 1)
+        compare(in.compareTo(value), ops)
+      case LongEnc | TimestampEnc =>
+        val in = Bytes.toInt(input, offset1)
+        val value = Bytes.toInt(filterBytes, offset2 + 1)
+        compare(in.compareTo(value), ops)
+      case FloatEnc =>
+        val in = Bytes.toFloat(input, offset1)
+        val value = Bytes.toFloat(filterBytes, offset2 + 1)
+        compare(in.compareTo(value), ops)
+      case DoubleEnc =>
+        val in = Bytes.toDouble(input, offset1)
+        val value = Bytes.toDouble(filterBytes, offset2 + 1)
+        compare(in.compareTo(value), ops)
+      case _ =>
+        // for String, Byte, Binary, Boolean and other types
+        // we can use the order of byte array directly.
+        compare(
+          Bytes.compareTo(input, offset1, length1, filterBytes, offset2 + 1, 
length2 - 1), ops)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
index 4ff0413..ce7b55a 100644
--- 
a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala
@@ -23,6 +23,8 @@ import scala.math.Ordering
 
 package object hbase {
   type HBaseType = Array[Byte]
+  def bytesMin = new Array[Byte](0)
+  def bytesMax = null
   val ByteMax = -1.asInstanceOf[Byte]
   val ByteMin = 0.asInstanceOf[Byte]
   val ord: Ordering[HBaseType] = new Ordering[HBaseType] {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
 
b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
index 831c7de..c2d611f 100644
--- 
a/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
+++ 
b/hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala
@@ -156,7 +156,7 @@ case class HBaseTableCatalog(
   def get(key: String) = params.get(key)
 
   // Setup the start and length for each dimension of row key at runtime.
-  def dynSetupRowKey(rowKey: HBaseType) {
+  def dynSetupRowKey(rowKey: Array[Byte]) {
     logDebug(s"length: ${rowKey.length}")
     if(row.varLength) {
       var start = 0

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 4312b38..0f8baed 100644
--- 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -329,14 +329,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
       equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
 
     assert(executionRules.rowKeyFilter.points.size == 0)
-    assert(executionRules.rowKeyFilter.ranges.size == 1)
-
-    val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
-    assert(scanRange1.upperBound == null)
-    assert(scanRange1.isLowerBoundEqualTo)
-    assert(scanRange1.isUpperBoundEqualTo)
-
+    assert(executionRules.rowKeyFilter.ranges.size == 2)
     assert(results.length == 5)
   }
 
@@ -358,18 +351,14 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     assert(executionRules.rowKeyFilter.points.size == 0)
 
-    assert(executionRules.rowKeyFilter.ranges.size == 2)
+    assert(executionRules.rowKeyFilter.ranges.size == 3)
 
     val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
-    assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
     assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
     assert(scanRange1.isLowerBoundEqualTo)
     assert(!scanRange1.isUpperBoundEqualTo)
 
     val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
-    assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes(4)))
-    assert(scanRange2.upperBound == null)
-    assert(!scanRange2.isLowerBoundEqualTo)
     assert(scanRange2.isUpperBoundEqualTo)
 
     assert(results.length == 2)

http://git-wip-us.apache.org/repos/asf/hbase/blob/b353e388/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
index 3140ebd..ff4201c 100644
--- 
a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
+++ 
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
@@ -19,13 +19,17 @@ package org.apache.hadoop.hbase.spark
 
 import java.util
 
+import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, 
JavaBytesEncoder}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.spark.Logging
+import org.apache.spark.sql.types._
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 class DynamicLogicExpressionSuite  extends FunSuite with
 BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
+  val encoder = JavaBytesEncoder.create(HBaseSparkConf.defaultEncoder)
+
   test("Basic And Test") {
     val leftLogic = new LessThanLogicExpression("Col1", 0)
     val rightLogic = new GreaterThanLogicExpression("Col1", 1)
@@ -35,33 +39,33 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     columnToCurrentRowValueMap.put("Col1", new 
ByteArrayComparable(Bytes.toBytes(10)))
     val valueFromQueryValueArray = new Array[Array[Byte]](2)
-    valueFromQueryValueArray(0) = Bytes.toBytes(15)
-    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
     assert(andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
-    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
     assert(!andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(15)
-    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
     assert(!andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
     val expressionString = andLogic.toExpressionString
 
     assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
 
-    val builtExpression = DynamicLogicExpressionBuilder.build(expressionString)
-    valueFromQueryValueArray(0) = Bytes.toBytes(15)
-    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    val builtExpression = 
DynamicLogicExpressionBuilder.build(expressionString, encoder)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
     assert(builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
-    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
     assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(15)
-    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
     assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
   }
@@ -75,41 +79,41 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     columnToCurrentRowValueMap.put("Col1", new 
ByteArrayComparable(Bytes.toBytes(10)))
     val valueFromQueryValueArray = new Array[Array[Byte]](2)
-    valueFromQueryValueArray(0) = Bytes.toBytes(15)
-    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
     assert(OrLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
-    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
     assert(OrLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(15)
-    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
     assert(OrLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
-    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
     assert(!OrLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
     val expressionString = OrLogic.toExpressionString
 
     assert(expressionString.equals("( Col1 < 0 OR Col1 > 1 )"))
 
-    val builtExpression = DynamicLogicExpressionBuilder.build(expressionString)
-    valueFromQueryValueArray(0) = Bytes.toBytes(15)
-    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    val builtExpression = 
DynamicLogicExpressionBuilder.build(expressionString, encoder)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
     assert(builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
-    valueFromQueryValueArray(1) = Bytes.toBytes(5)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 5)
     assert(builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(15)
-    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 15)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
     assert(builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
-    valueFromQueryValueArray(1) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
+    valueFromQueryValueArray(1) = encoder.encode(IntegerType, 10)
     assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
   }
 
@@ -127,40 +131,40 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     val valueFromQueryValueArray = new Array[Array[Byte]](1)
 
     //great than
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
     assert(!greaterLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(20)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 20)
     assert(!greaterLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
     //great than and equal
-    valueFromQueryValueArray(0) = Bytes.toBytes(5)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 5)
     assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
       valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
     assert(greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
       valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(20)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 20)
     assert(!greaterAndEqualLogic.execute(columnToCurrentRowValueMap,
       valueFromQueryValueArray))
 
     //less than
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
     assert(!lessLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(5)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 5)
     assert(!lessLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
     //less than and equal
-    valueFromQueryValueArray(0) = Bytes.toBytes(20)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 20)
     assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(20)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 20)
     assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
-    valueFromQueryValueArray(0) = Bytes.toBytes(10)
+    valueFromQueryValueArray(0) = encoder.encode(IntegerType, 10)
     assert(lessAndEqualLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
 
     //equal too
@@ -183,8 +187,137 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     valueFromQueryValueArray(0) = Bytes.toBytes(5)
     assert(passThrough.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+  }
+
+
+  test("Double Type") {
+    val leftLogic = new LessThanLogicExpression("Col1", 0)
+    val rightLogic = new GreaterThanLogicExpression("Col1", 1)
+    val andLogic = new AndLogicExpression(leftLogic, rightLogic)
+
+    val columnToCurrentRowValueMap = new util.HashMap[String, 
ByteArrayComparable]()
+
+    columnToCurrentRowValueMap.put("Col1", new 
ByteArrayComparable(Bytes.toBytes(-4.0d)))
+    val valueFromQueryValueArray = new Array[Array[Byte]](2)
+    valueFromQueryValueArray(0) = encoder.encode(DoubleType, 15.0d)
+    valueFromQueryValueArray(1) = encoder.encode(DoubleType, -5.0d)
+    assert(andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(DoubleType, 10.0d)
+    valueFromQueryValueArray(1) = encoder.encode(DoubleType, -1.0d)
+    assert(!andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(DoubleType, -10.0d)
+    valueFromQueryValueArray(1) = encoder.encode(DoubleType, -20.0d)
+    assert(!andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    val expressionString = andLogic.toExpressionString
+    // Note that here 0 and 1 is index, instead of value.
+    assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
+
+    val builtExpression = 
DynamicLogicExpressionBuilder.build(expressionString, encoder)
+    valueFromQueryValueArray(0) = encoder.encode(DoubleType, 15.0d)
+    valueFromQueryValueArray(1) = encoder.encode(DoubleType, -5.0d)
+    assert(builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(DoubleType, 10.0d)
+    valueFromQueryValueArray(1) = encoder.encode(DoubleType, -1.0d)
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(DoubleType, -10.0d)
+    valueFromQueryValueArray(1) = encoder.encode(DoubleType, -20.0d)
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+  }
+
+  test("Float Type") {
+    val leftLogic = new LessThanLogicExpression("Col1", 0)
+    val rightLogic = new GreaterThanLogicExpression("Col1", 1)
+    val andLogic = new AndLogicExpression(leftLogic, rightLogic)
+
+    val columnToCurrentRowValueMap = new util.HashMap[String, 
ByteArrayComparable]()
 
+    columnToCurrentRowValueMap.put("Col1", new 
ByteArrayComparable(Bytes.toBytes(-4.0f)))
+    val valueFromQueryValueArray = new Array[Array[Byte]](2)
+    valueFromQueryValueArray(0) = encoder.encode(FloatType, 15.0f)
+    valueFromQueryValueArray(1) = encoder.encode(FloatType, -5.0f)
+    assert(andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(FloatType, 10.0f)
+    valueFromQueryValueArray(1) = encoder.encode(FloatType, -1.0f)
+    assert(!andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(FloatType, -10.0f)
+    valueFromQueryValueArray(1) = encoder.encode(FloatType, -20.0f)
+    assert(!andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    val expressionString = andLogic.toExpressionString
+    // Note that here 0 and 1 is index, instead of value.
+    assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
+
+    val builtExpression = 
DynamicLogicExpressionBuilder.build(expressionString, encoder)
+    valueFromQueryValueArray(0) = encoder.encode(FloatType, 15.0f)
+    valueFromQueryValueArray(1) = encoder.encode(FloatType, -5.0f)
+    assert(builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(FloatType, 10.0f)
+    valueFromQueryValueArray(1) = encoder.encode(FloatType, -1.0f)
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(FloatType, -10.0f)
+    valueFromQueryValueArray(1) = encoder.encode(FloatType, -20.0f)
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+  }
+
+  test("String Type") {
+    val leftLogic = new LessThanLogicExpression("Col1", 0)
+    val rightLogic = new GreaterThanLogicExpression("Col1", 1)
+    val andLogic = new AndLogicExpression(leftLogic, rightLogic)
+
+    val columnToCurrentRowValueMap = new util.HashMap[String, 
ByteArrayComparable]()
+
+    columnToCurrentRowValueMap.put("Col1", new 
ByteArrayComparable(Bytes.toBytes("row005")))
+    val valueFromQueryValueArray = new Array[Array[Byte]](2)
+    valueFromQueryValueArray(0) = encoder.encode(StringType, "row015")
+    valueFromQueryValueArray(1) = encoder.encode(StringType, "row000")
+    assert(andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(StringType, "row004")
+    valueFromQueryValueArray(1) = encoder.encode(StringType, "row000")
+    assert(!andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(StringType, "row020")
+    valueFromQueryValueArray(1) = encoder.encode(StringType, "row010")
+    assert(!andLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    val expressionString = andLogic.toExpressionString
+    // Note that here 0 and 1 is index, instead of value.
+    assert(expressionString.equals("( Col1 < 0 AND Col1 > 1 )"))
+
+    val builtExpression = 
DynamicLogicExpressionBuilder.build(expressionString, encoder)
+    valueFromQueryValueArray(0) = encoder.encode(StringType, "row015")
+    valueFromQueryValueArray(1) = encoder.encode(StringType, "row000")
+    assert(builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(StringType, "row004")
+    valueFromQueryValueArray(1) = encoder.encode(StringType, "row000")
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+
+    valueFromQueryValueArray(0) = encoder.encode(StringType, "row020")
+    valueFromQueryValueArray(1) = encoder.encode(StringType, "row010")
+    assert(!builtExpression.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
   }
 
+  test("Boolean Type") {
+    val leftLogic = new LessThanLogicExpression("Col1", 0)
+    val rightLogic = new GreaterThanLogicExpression("Col1", 1)
+
+    val columnToCurrentRowValueMap = new util.HashMap[String, 
ByteArrayComparable]()
 
+    columnToCurrentRowValueMap.put("Col1", new 
ByteArrayComparable(Bytes.toBytes(false)))
+    val valueFromQueryValueArray = new Array[Array[Byte]](2)
+    valueFromQueryValueArray(0) = encoder.encode(BooleanType, true)
+    valueFromQueryValueArray(1) = encoder.encode(BooleanType, false)
+    assert(leftLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+    assert(!rightLogic.execute(columnToCurrentRowValueMap, 
valueFromQueryValueArray))
+  }
 }

Reply via email to