http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
index e1f2176..4a7515e 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
@@ -26,7 +26,6 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.Int8Datum;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.function.annotation.Description;
@@ -66,7 +65,7 @@ public class SumIntDistinct extends AggFunction<Datum> {
   public void merge(FunctionContext context, Tuple params) {
     SumContext distinctContext = (SumContext) context;
     Datum value = params.get(0);
-    if ((distinctContext.latest == null || 
(!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+    if ((distinctContext.latest == null || 
(!distinctContext.latest.equals(value)) && !(value.isNull()))) {
       distinctContext.latest = value;
       distinctContext.sum += value.asInt4();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
index d899c37..7672e29 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
@@ -26,7 +26,6 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.Int8Datum;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.function.annotation.Description;
@@ -66,7 +65,7 @@ public class SumLongDistinct extends AggFunction<Datum> {
   public void merge(FunctionContext context, Tuple params) {
     SumContext distinctContext = (SumContext) context;
     Datum value = params.get(0);
-    if ((distinctContext.latest == null || 
(!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+    if ((distinctContext.latest == null || 
(!distinctContext.latest.equals(value)) && !(value.isNull()))) {
       distinctContext.latest = value;
       distinctContext.sum += value.asInt8();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromDate.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromDate.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromDate.java
index 288fbe1..9811bb6 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromDate.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromDate.java
@@ -50,12 +50,12 @@ public class DatePartFromDate extends GeneralFunction {
   public Datum eval(Tuple params) {
     Datum target = params.get(0);
 
-    if(target instanceof NullDatum || params.get(1) instanceof NullDatum) {
+    if(target.isNull() || params.get(1).isNull()) {
       return NullDatum.get();
     }
 
     DateDatum date;
-    if(params.get(1) instanceof DateDatum) {
+    if(params.get(1).type() == DATE) {
       date = (DateDatum)(params.get(1));
     } else {
       return NullDatum.get();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTime.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTime.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTime.java
index b3184e3..f0f9652 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTime.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTime.java
@@ -34,6 +34,7 @@ import org.apache.tajo.util.datetime.TimeMeta;
 
 import static org.apache.tajo.common.TajoDataTypes.Type.FLOAT8;
 import static org.apache.tajo.common.TajoDataTypes.Type.TEXT;
+import static org.apache.tajo.common.TajoDataTypes.Type.TIME;
 
 @Description(
     functionName = "date_part",
@@ -58,11 +59,11 @@ public class DatePartFromTime extends GeneralFunction {
     Datum target = params.get(0);
     TimeDatum time = null;
 
-    if(target instanceof NullDatum || params.get(1) instanceof NullDatum) {
+    if(target.isNull()|| params.get(1).isNull()) {
       return NullDatum.get();
     }
 
-    if(params.get(1) instanceof TimeDatum) {
+    if(params.get(1).type() == TIME) {
       time = (TimeDatum)(params.get(1));
     } else {
       return NullDatum.get();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTimestamp.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTimestamp.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTimestamp.java
index 98900ef..1b539aa 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTimestamp.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DatePartFromTimestamp.java
@@ -54,11 +54,11 @@ public class DatePartFromTimestamp extends GeneralFunction {
     Datum target = params.get(0);
     TimestampDatum timestamp;
 
-    if(target instanceof NullDatum || params.get(1) instanceof NullDatum) {
+    if(target.isNull() || params.get(1).isNull()) {
       return NullDatum.get();
     }
 
-    if(params.get(1) instanceof TimestampDatum) {
+    if(params.get(1).type() == TIMESTAMP) {
       timestamp = (TimestampDatum)(params.get(1));
     } else {
       return NullDatum.get();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DateTimePartFromUnixTimestamp.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DateTimePartFromUnixTimestamp.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DateTimePartFromUnixTimestamp.java
index 8705b06..0b01afa 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DateTimePartFromUnixTimestamp.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/DateTimePartFromUnixTimestamp.java
@@ -61,11 +61,11 @@ public class DateTimePartFromUnixTimestamp extends 
GeneralFunction {
     DateTime dateTime;
     Int4Datum dayOfWeek = null;
 
-    if (target instanceof NullDatum || params.get(1) instanceof NullDatum) {
+    if (target.isNull() || params.get(1).isNull()) {
       return NullDatum.get();
     }
 
-    if (params.get(1) instanceof Int8Datum) {
+    if (params.get(1).type() == INT8) {
       dateTime = DateTimeUtil.getUTCDateTime((Int8Datum) (params.get(1)));
     } else {
       return NullDatum.get();
@@ -85,7 +85,7 @@ public class DateTimePartFromUnixTimestamp extends 
GeneralFunction {
       } else if (extractType.equals("year")) {
         extractor = new YearExtractorFromTime();
       } else if (extractType.equals("week")) {
-        if (params.get(2) instanceof NullDatum) {
+        if (params.get(2).isNull()) {
           return NullDatum.get();
         }
         dayOfWeek = (Int4Datum) params.get(2);

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
index d14cfd6..4ec494c 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/datetime/ToTimestampInt.java
@@ -47,7 +47,7 @@ public class ToTimestampInt extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum value = params.get(0);
-    if (value instanceof NullDatum) {
+    if (value.isNull()) {
       return NullDatum.get();
     }
     return DatumFactory.createTimestmpDatumWithUnixTime(value.asInt4());

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryInet4.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryInet4.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryInet4.java
index d922e76..1014946 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryInet4.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryInet4.java
@@ -47,7 +47,7 @@ public class GeoIPCountryInet4 extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if (valueDatum instanceof NullDatum) {
+    if (valueDatum.isNull()) {
       return NullDatum.get();
     }
     return new TextDatum(GeoIPUtil.getCountryCode(params.get(0).asChars()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryText.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryText.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryText.java
index 17a43be..5fe92d8 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryText.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPCountryText.java
@@ -48,7 +48,7 @@ public class GeoIPCountryText extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if (valueDatum instanceof NullDatum) {
+    if (valueDatum.isNull()) {
       return NullDatum.get();
     }
     return new TextDatum(GeoIPUtil.getCountryCode(params.get(0).asChars()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryInet4.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryInet4.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryInet4.java
index 19ce8ba..f071740 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryInet4.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryInet4.java
@@ -48,7 +48,7 @@ public class GeoIPInCountryInet4 extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    if (params.get(0) instanceof NullDatum || params.get(1) instanceof 
NullDatum) {
+    if (params.get(0).isNull() || params.get(1).isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryText.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryText.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryText.java
index 168f86c..7ca4857 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryText.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/geoip/GeoIPInCountryText.java
@@ -47,7 +47,7 @@ public class GeoIPInCountryText extends GeneralFunction {
 
   @Override
   public Datum eval(Tuple params) {
-    if (params.get(0) instanceof NullDatum || params.get(1) instanceof 
NullDatum) {
+    if (params.get(0).isNull() || params.get(1).isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Ceil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Ceil.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Ceil.java
index cca7f60..7ad9584 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Ceil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Ceil.java
@@ -53,7 +53,7 @@ public class Ceil extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Cos.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Cos.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Cos.java
index 997784a..c91830d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Cos.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Cos.java
@@ -51,7 +51,7 @@ public class Cos extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Degrees.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Degrees.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Degrees.java
index cd5fd2f..05c179a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Degrees.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Degrees.java
@@ -52,7 +52,7 @@ public class Degrees extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Div.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Div.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Div.java
index d904a2c..48b4a9a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Div.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Div.java
@@ -56,7 +56,7 @@ public class Div extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum value1Datum = params.get(0);
-    if(value1Datum instanceof NullDatum) {
+    if(value1Datum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Exp.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Exp.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Exp.java
index 3c3b07c..a7ad467 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Exp.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Exp.java
@@ -52,7 +52,7 @@ public class Exp extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Floor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Floor.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Floor.java
index 7e4cee0..62727fe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Floor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Floor.java
@@ -52,7 +52,7 @@ public class Floor extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Mod.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Mod.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Mod.java
index c748a2d..eb6ead9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Mod.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Mod.java
@@ -56,7 +56,7 @@ public class Mod extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum value1Datum = params.get(0);
-    if(value1Datum instanceof NullDatum) {
+    if(value1Datum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Pow.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Pow.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Pow.java
index 101e508..006b5b6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Pow.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Pow.java
@@ -69,7 +69,7 @@ public class Pow extends GeneralFunction {
   public Datum eval(Tuple params) {
     Datum value1Datum = params.get(0);
     Datum value2Datum = params.get(1);
-    if(value1Datum instanceof NullDatum || value2Datum instanceof NullDatum) {
+    if(value1Datum.isNull() || value2Datum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Radians.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Radians.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Radians.java
index a8bf363..fed44f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Radians.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Radians.java
@@ -53,7 +53,7 @@ public class Radians extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
index cdcb70a..1a24295 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java
@@ -55,7 +55,7 @@ public class Round extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/RoundFloat8.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/RoundFloat8.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/RoundFloat8.java
index d104431..e844371 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/RoundFloat8.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/RoundFloat8.java
@@ -59,7 +59,7 @@ public class RoundFloat8 extends GeneralFunction {
     Datum valueDatum = params.get(0);
     Datum roundDatum = params.get(1);
 
-    if(valueDatum instanceof NullDatum || roundDatum instanceof  NullDatum) {
+    if(valueDatum.isNull() || roundDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sign.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sign.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sign.java
index f9c49bf..e9fa4a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sign.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sign.java
@@ -55,7 +55,7 @@ public class Sign extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sin.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sin.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sin.java
index 31d56b4..c90183c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sin.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sin.java
@@ -51,7 +51,7 @@ public class Sin extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sqrt.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sqrt.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sqrt.java
index aba33f4..9b72b06 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sqrt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Sqrt.java
@@ -53,7 +53,7 @@ public class Sqrt extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Tan.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Tan.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Tan.java
index d551d8b..7625378 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Tan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Tan.java
@@ -51,7 +51,7 @@ public class Tan extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/FindInSet.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/FindInSet.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/FindInSet.java
index 0c3e221..abd5d8d 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/FindInSet.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/FindInSet.java
@@ -62,7 +62,7 @@ public class FindInSet extends GeneralFunction {
     Datum finding = params.get(0);
     Datum textArray = params.get(1);
 
-    if (finding instanceof NullDatum || textArray instanceof NullDatum) {
+    if (finding.isNull() || textArray.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/InitCap.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/InitCap.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/InitCap.java
index 4347dbb..df730f6 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/InitCap.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/InitCap.java
@@ -53,7 +53,9 @@ public class InitCap extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     return DatumFactory.createText(WordUtils.capitalizeFully(datum.asChars()));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/LTrim.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/LTrim.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/LTrim.java
index db6b714..26ed8cf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/LTrim.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/LTrim.java
@@ -66,7 +66,9 @@ public class LTrim extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     if (!hasTrimCharacters) {
       return DatumFactory.createText(StringUtils.stripStart(datum.asChars(), 
null));

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Left.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Left.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Left.java
index 31469f5..b52bae2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Left.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Left.java
@@ -65,10 +65,14 @@ public class Left extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     Datum sizeDatum = params.get(1);
-    if(sizeDatum instanceof NullDatum) return NullDatum.get();
+    if(sizeDatum.isNull()) {
+      return NullDatum.get();
+    }
 
     String data = datum.asChars();
     int length = data.length();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Length.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Length.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Length.java
index 389f358..fc6f9e1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Length.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Length.java
@@ -52,7 +52,7 @@ public class Length  extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) {
+    if(datum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Locate.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Locate.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Locate.java
index 67ee389..b647ca3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Locate.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Locate.java
@@ -79,11 +79,11 @@ public class Locate extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum strDatum = params.get(0);
-    if(strDatum instanceof NullDatum) {
+    if(strDatum.isNull()) {
       return NullDatum.get();
     }
     Datum substrDatum = params.get(1);
-    if (substrDatum instanceof NullDatum) {
+    if (substrDatum.isNull()) {
       return NullDatum.get();
     }
     

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lower.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lower.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lower.java
index 80c8192..8738a02 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lower.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lower.java
@@ -51,7 +51,9 @@ public class Lower extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     return DatumFactory.createText(datum.asChars().toLowerCase());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lpad.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lpad.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lpad.java
index bc0031e..75bdfb6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lpad.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Lpad.java
@@ -68,8 +68,12 @@ public class Lpad extends GeneralFunction {
     Datum datum = params.get(0);
     Datum lengthDatum = params.get(1);
 
-    if (datum instanceof NullDatum) return NullDatum.get();
-    if (lengthDatum instanceof NullDatum) return NullDatum.get();
+    if (datum.isNull()) {
+      return NullDatum.get();
+    }
+    if (lengthDatum.isNull()) {
+      return NullDatum.get();
+    }
 
     Datum fillText = NullDatum.get();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java
index fb5f73c..a2004cb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Md5.java
@@ -53,7 +53,9 @@ public class Md5 extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     try {
         MessageDigest md = MessageDigest.getInstance("MD5");

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
index dc71907..c8c38ad 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/OctetLength.java
@@ -52,7 +52,7 @@ public class OctetLength  extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) {
+    if(datum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
index d5237d7..e975bd5 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/QuoteIdent.java
@@ -58,7 +58,7 @@ public class QuoteIdent extends GeneralFunction {
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
 
-    if(datum instanceof NullDatum) {
+    if(datum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java
index 9e3ff8d..d36118c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RTrim.java
@@ -65,7 +65,9 @@ public class RTrim extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     if (!hasTrimCharacters) {
       return DatumFactory.createText(StringUtils.stripEnd(datum.asChars(), 
null));

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
index 2b59d34..e7eba01 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/RegexpReplace.java
@@ -81,9 +81,9 @@ public class RegexpReplace extends GeneralFunction {
     Datum thisPattern = params.get(1);
     Datum thisReplacement = params.get(2);
     boolean nullResult = isAlwaysNull
-        || thisValue instanceof NullDatum
-        || thisReplacement instanceof NullDatum
-        || thisPattern instanceof NullDatum;
+        || thisValue.isNull()
+        || thisReplacement.isNull()
+        || thisPattern.isNull();
 
     Pattern thisCompiled;
     if (!nullResult) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java
index 4c52a6e..589b224 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Repeat.java
@@ -62,7 +62,9 @@ public class Repeat extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     Datum countDatum = params.get(1);
     if(countDatum instanceof NullDatum) return NullDatum.get();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java
index a77ba69..b81c5bc 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Reverse.java
@@ -51,7 +51,9 @@ public class Reverse extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     return DatumFactory.createText(new 
StringBuffer(datum.asChars()).reverse().toString());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java
index aa0dad0..388d3f6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Right.java
@@ -65,10 +65,14 @@ public class Right extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
 
     Datum sizeDatum = params.get(1);
-    if(sizeDatum instanceof NullDatum) return NullDatum.get();
+    if(sizeDatum.isNull()) {
+      return NullDatum.get();
+    }
 
     String data = datum.asChars();
     int length = data.length();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java
index 225777b..3900389 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Rpad.java
@@ -69,8 +69,12 @@ public class Rpad extends GeneralFunction {
     Datum datum = params.get(0);
     Datum lengthDatum = params.get(1);
 
-    if(datum instanceof NullDatum) return NullDatum.get();
-    if(lengthDatum instanceof NullDatum) return NullDatum.get();
+    if(datum.isNull()) {
+      return NullDatum.get();
+    }
+    if(lengthDatum.isNull()) {
+      return NullDatum.get();
+    }
 
     Datum fillText=NullDatum.get();
     if(hasFillCharacters) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java
index d6f88de..b66f99f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPos.java
@@ -53,7 +53,7 @@ public class StrPos extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java
index 56fb3a1..c329a42 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/StrPosb.java
@@ -55,12 +55,12 @@ public class StrPosb extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum valueDatum = params.get(0);
-    if(valueDatum instanceof NullDatum) {
+    if(valueDatum.isNull()) {
       return NullDatum.get();
     }
 
     Datum substringDatum = params.get(1);
-    if(substringDatum instanceof NullDatum) {
+    if(substringDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java
index 74492b1..68deb0a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/Substr.java
@@ -60,7 +60,7 @@ public class Substr extends GeneralFunction {
     Datum fromDatum = params.get(1);
     Datum countDatum = params.size() > 2 ? params.get(2) : null;
 
-    if(valueDatum instanceof NullDatum || fromDatum instanceof NullDatum || 
countDatum instanceof NullDatum) {
+    if(valueDatum.isNull() || fromDatum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java
index 3048aa8..84ea4a8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToBin.java
@@ -55,7 +55,7 @@ public class ToBin extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) {
+    if(datum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java
index a8624ab..4039c4d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToHex.java
@@ -66,7 +66,7 @@ public class ToHex extends GeneralFunction {
   @Override
   public Datum eval(Tuple params) {
     Datum datum = params.get(0);
-    if(datum instanceof NullDatum) {
+    if(datum.isNull()) {
       return NullDatum.get();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 580ec61..4e090ea 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -463,6 +463,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> 
{
       if (specContext.order_specification() != null) {
         if (specContext.order.DESC() != null) {
           specs[i].setDescending();
+          specs[i].setNullFirst(); // null first is default in descending order
         }
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
index 574e32c..6a98076 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
@@ -460,7 +460,7 @@ public class ExprAnnotator extends 
BaseAlgebraVisitor<ExprAnnotator.Context, Eva
 
     // A pattern is a const value in pattern matching predicates.
     // In a binary expression, the result is always null if a const value in 
left or right side is null.
-    if (pattern.getValue() instanceof NullDatum) {
+    if (pattern.getValue().isNull()) {
       return new ConstEval(NullDatum.get());
     } else {
       if (expr.getType() == OpType.LikePredicate) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
index 96758d6..950df37 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
@@ -29,7 +29,7 @@ import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.nameresolver.NameResolver;
 import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 89382f4..23eca80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -44,7 +44,7 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.Pair;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2730202..9be29de 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -45,8 +45,8 @@ import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
 import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.FileUtil;
@@ -55,7 +55,6 @@ import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Stack;
 
@@ -70,7 +69,6 @@ import static 
org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
 
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
-  private static final int UNGENERATED_PID = -1;
 
   protected final TajoConf conf;
   protected final AbstractStorageManager sm;
@@ -1138,7 +1136,7 @@ public class PhysicalPlannerImpl implements 
PhysicalPlanner {
     String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), 
annotation.getSortKeys());
     Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), 
"index");
 
-    TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
+    BaseTupleComparator comp = new 
BaseTupleComparator(annotation.getKeySchema(),
         annotation.getSortKeys());
     return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new 
Path(indexPath, indexName),
         annotation.getKeySchema(), comp, annotation.getDatum());

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 3390758..3a12039 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -31,8 +31,8 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;
@@ -540,11 +540,12 @@ public class PlannerUtil {
     return new SortSpec[][]{outerSortSpec, innerSortSpec};
   }
 
-  public static TupleComparator[] getComparatorsFromJoinQual(EvalNode 
joinQual, Schema leftSchema, Schema rightSchema) {
+  public static BaseTupleComparator[] getComparatorsFromJoinQual(EvalNode 
joinQual, Schema leftSchema,
+                                                                 Schema 
rightSchema) {
     SortSpec[][] sortSpecs = getSortKeysFromJoinQual(joinQual, leftSchema, 
rightSchema);
-    TupleComparator[] comparators = new TupleComparator[2];
-    comparators[0] = new TupleComparator(leftSchema, sortSpecs[0]);
-    comparators[1] = new TupleComparator(rightSchema, sortSpecs[1]);
+    BaseTupleComparator[] comparators = new BaseTupleComparator[2];
+    comparators[0] = new BaseTupleComparator(leftSchema, sortSpecs[0]);
+    comparators[1] = new BaseTupleComparator(rightSchema, sortSpecs[1]);
     return comparators;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index db12285..551a9d0 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -20,7 +20,6 @@ package org.apache.tajo.engine.planner;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.primitives.UnsignedLong;
 import com.sun.tools.javac.util.Convert;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
@@ -571,7 +570,7 @@ public class UniformRangePartition extends 
RangePartitionAlgorithm {
             } else {
 
               if (isPureAscii[i]) {
-                lastBigInt = UnsignedLong.valueOf(new 
BigInteger(last.get(i).asByteArray())).bigIntegerValue();
+                lastBigInt = new BigInteger(last.get(i).asByteArray());
                 if (sortSpecs[i].isAscending()) {
                   end.put(i, 
DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
                 } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index b3b5bb0..c0bcb7e 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 432589b..37076b6 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -1141,8 +1141,8 @@ public class GlobalPlanner {
 
     if (node.getType() == NodeType.INSERT) {
       InsertNode insertNode = (InsertNode) node;
-      channel.setSchema(((InsertNode)node).getProjectedSchema());
-      Column [] shuffleKeys = new 
Column[partitionMethod.getExpressionSchema().size()];
+      channel.setSchema(((InsertNode) node).getProjectedSchema());
+      Column[] shuffleKeys = new 
Column[partitionMethod.getExpressionSchema().size()];
       int i = 0;
       for (Column column : partitionMethod.getExpressionSchema().getColumns()) 
{
         int id = 
insertNode.getTableSchema().getColumnId(column.getQualifiedName());
@@ -1152,7 +1152,7 @@ public class GlobalPlanner {
       channel.setShuffleType(SCATTERED_HASH_SHUFFLE);
     } else {
       channel.setShuffleKeys(partitionMethod.getExpressionSchema().toArray());
-      channel.setShuffleType(HASH_SHUFFLE);
+      channel.setShuffleType(SCATTERED_HASH_SHUFFLE);
     }
     channel.setShuffleOutputNum(32);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 8d28e6e..9447056 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -27,7 +27,7 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
 public class ScanNode extends RelationNode implements Projectable, 
SelectableNode, Cloneable {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index 3c808fc..559efe0 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -24,7 +24,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 
 public class TableSubQueryNode extends RelationNode implements Projectable {
   @Expose private String tableName;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
index f65fee7..703d352 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -25,7 +25,7 @@ import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index f831525..567e3aa 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -50,7 +50,7 @@ public class BSTIndexScanExec extends PhysicalExec {
   public BSTIndexScanExec(TaskAttemptContext context,
                           AbstractStorageManager sm , ScanNode scanNode ,
        FileFragment fragment, Path fileName , Schema keySchema,
-       TupleComparator comparator , Datum[] datum) throws IOException {
+       BaseTupleComparator comparator , Datum[] datum) throws IOException {
     super(context, scanNode.getInSchema(), scanNode.getOutSchema());
     this.scanNode = scanNode;
     this.qual = scanNode.getQual();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 92738e5..6b02279 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -79,10 +79,7 @@ public abstract class ColPartitionStoreExec extends 
UnaryPhysicalExec {
     }
 
     PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, 
meta);
-
-    if 
(context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) {
-      maxPerFileSize = 
context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * 
StorageUnit.MB;
-    }
+    maxPerFileSize = 
context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * 
StorageUnit.MB;
 
     // Find column index to name subpartition directory path
     keyNum = this.plan.getPartitionMethod().getExpressionSchema().size();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 700e34d..e18448f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -19,17 +19,14 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.SessionVars;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -37,21 +34,20 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.PhysicalPlanningException;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.tuple.offheap.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.storage.rawfile.DirectRawFileScanner;
+import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.List;
 import java.util.concurrent.*;
 
-import static org.apache.tajo.storage.RawFile.RawFileAppender;
-import static org.apache.tajo.storage.RawFile.RawFileScanner;
-
 /**
  * This external sort algorithm can be characterized by the followings:
  *
@@ -78,7 +74,8 @@ public class ExternalSortExec extends SortExec {
   /** If there are available multiple cores, it tries parallel merge. */
   private ExecutorService executorService;
   /** used for in-memory sort of each chunk. */
-  private List<Tuple> inMemoryTable;
+  private OffHeapRowBlock tupleBlock;
+  private List<Tuple> sortedTuples;
   /** temporal dir */
   private final Path sortTmpDir;
   /** It enables round-robin disks allocation */
@@ -100,14 +97,14 @@ public class ExternalSortExec extends SortExec {
   /** the final result */
   private Scanner result;
   /** total bytes of input data */
-  private long sortAndStoredBytes;
+  private long bytesOfLatestChunk;
 
   private ExternalSortExec(final TaskAttemptContext context, final 
AbstractStorageManager sm, final SortNode plan)
       throws PhysicalPlanningException {
     super(context, plan.getInSchema(), plan.getOutSchema(), null, 
plan.getSortKeys());
 
     this.plan = plan;
-    this.meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
+    this.meta = CatalogUtil.newTableMeta(StoreType.DIRECTRAW);
 
     this.defaultFanout = 
context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT);
     if (defaultFanout < 2) {
@@ -117,7 +114,7 @@ public class ExternalSortExec extends SortExec {
     this.sortBufferBytesNum = 
context.getQueryContext().getLong(SessionVars.EXTSORT_BUFFER_SIZE) * 
StorageUnit.MB;
     this.allocatedCoreNum = 
context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
     this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum);
-    this.inMemoryTable = new ArrayList<Tuple>(100000);
+    this.tupleBlock = new OffHeapRowBlock(inSchema, new 
FixedSizeLimitSpec(sortBufferBytesNum));
 
     this.sortTmpDir = getExecutorTmpDir();
     localDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
@@ -160,27 +157,25 @@ public class ExternalSortExec extends SortExec {
   /**
    * Sort a tuple block and store them into a chunk file
    */
-  private Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
+  private Path sortAndStoreChunk(int chunkId, OffHeapRowBlock sortBuffer)
       throws IOException {
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW);
-    int rowNum = tupleBlock.size();
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.DIRECTRAW);
+    int rowNum = sortBuffer.rows();
 
     long sortStart = System.currentTimeMillis();
-    Collections.sort(tupleBlock, getComparator());
+    List<Tuple> tupleList = OffHeapRowBlockUtils.sort(sortBuffer, 
getComparator());
     long sortEnd = System.currentTimeMillis();
 
     long chunkWriteStart = System.currentTimeMillis();
     Path outputPath = getChunkPathForWrite(0, chunkId);
-    final RawFileAppender appender = new RawFileAppender(context.getConf(), 
inSchema, meta, outputPath);
+    final DirectRawFileWriter appender = new 
DirectRawFileWriter(context.getConf(), inSchema, meta, outputPath);
     appender.init();
-    for (Tuple t : tupleBlock) {
+    for (Tuple t : tupleList) {
       appender.addTuple(t);
     }
     appender.close();
-    tupleBlock.clear();
-    long chunkWriteEnd = System.currentTimeMillis();
-
 
+    long chunkWriteEnd = System.currentTimeMillis();
     info(LOG, "Chunk #" + chunkId + " sort and written (" +
         FileUtil.humanReadableByteCount(appender.getOffset(), false) + " 
bytes, " + rowNum + " rows, " +
         ", sort time: " + (sortEnd - sortStart) + " msec, " +
@@ -196,17 +191,14 @@ public class ExternalSortExec extends SortExec {
    */
   private List<Path> sortAndStoreAllChunks() throws IOException {
     Tuple tuple;
-    long memoryConsumption = 0;
     List<Path> chunkPaths = TUtil.newList();
 
     int chunkId = 0;
     long runStartTime = System.currentTimeMillis();
     while ((tuple = child.next()) != null) { // partition sort start
-      Tuple vtuple = new VTuple(tuple);
-      inMemoryTable.add(vtuple);
-      memoryConsumption += MemoryUtil.calculateMemorySize(vtuple);
+      RowStoreUtil.convert(tuple, tupleBlock.getWriter());
 
-      if (memoryConsumption > sortBufferBytesNum) {
+      if (tupleBlock.usedMem() > sortBufferBytesNum) {
         long runEndTime = System.currentTimeMillis();
         info(LOG, chunkId + " run loading time: " + (runEndTime - 
runStartTime) + " msec");
         runStartTime = runEndTime;
@@ -214,9 +206,7 @@ public class ExternalSortExec extends SortExec {
         info(LOG, "Memory consumption exceeds " + sortBufferBytesNum + " 
bytes");
         memoryResident = false;
 
-        chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
-
-        memoryConsumption = 0;
+        chunkPaths.add(sortAndStoreChunk(chunkId, tupleBlock));
         chunkId++;
 
         // When the volume of sorting data once exceed the size of sort buffer,
@@ -231,27 +221,19 @@ public class ExternalSortExec extends SortExec {
         // That is, the progress was divided into two parts.
         // So, it multiply the progress of the children operator and 0.5f.
         progress = child.getProgress() * 0.5f;
+
+        tupleBlock.clear();
       }
     }
 
-    if (inMemoryTable.size() > 0) { // if there are at least one or more input 
tuples
-      if (!memoryResident) { // check if data exceeds a sort buffer. If so, it 
store the remain data into a chunk.
-        if (inMemoryTable.size() > 0) {
-          long start = System.currentTimeMillis();
-          int rowNum = inMemoryTable.size();
-          chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
-          long end = System.currentTimeMillis();
-          info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written 
(" + (end - start) + " msec)");
-        }
-      } else { // this case means that all data does not exceed a sort buffer
-        Collections.sort(inMemoryTable, getComparator());
-      }
+    if (tupleBlock.rows() >= 0) { // if there are at least one or more input 
tuples
+      sortedTuples = OffHeapRowBlockUtils.sort(tupleBlock, getComparator());
     }
 
     // get total loaded (or stored) bytes and total row numbers
     TableStats childTableStats = child.getInputStats();
     if (childTableStats != null) {
-      sortAndStoredBytes = childTableStats.getNumBytes();
+      bytesOfLatestChunk = childTableStats.getNumBytes();
     }
     return chunkPaths;
   }
@@ -260,7 +242,12 @@ public class ExternalSortExec extends SortExec {
    * Get a local path from all temporal paths in round-robin manner.
    */
   private synchronized Path getChunkPathForWrite(int level, int chunkId) 
throws IOException {
-    return localDirAllocator.getLocalPathForWrite(sortTmpDir + "/" + level 
+"_" + chunkId, context.getConf());
+    // example:
+    //   - ${sortTmpDir}/1_4.draw for 1 run and 4th chunk
+    //   - ${sortTmpDir}/2_8.draw for 2 run and 8th chunk
+    String outputFileName = sortTmpDir + "/" + level + "_" + chunkId + "." + 
DirectRawFileWriter.FILE_EXTENSION;
+    return localDirAllocator.getLocalPathForWrite(outputFileName, 
context.getConf());
+
   }
 
   @Override
@@ -280,10 +267,10 @@ public class ExternalSortExec extends SortExec {
         long startTimeOfChunkSplit = System.currentTimeMillis();
         List<Path> chunks = sortAndStoreAllChunks();
         long endTimeOfChunkSplit = System.currentTimeMillis();
-        info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - 
startTimeOfChunkSplit) + " msec");
+        info(LOG, "Sort of all chunks is completed (" + (endTimeOfChunkSplit - 
startTimeOfChunkSplit) + " msec).");
 
         if (memoryResident) { // if all sorted data reside in a main-memory 
table.
-          this.result = new MemTableScanner();
+          this.result = new MemTableScanner(sortedTuples, bytesOfLatestChunk);
         } else { // if input data exceeds main-memory at least once
 
           try {
@@ -441,7 +428,7 @@ public class ExternalSortExec extends SortExec {
       final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
       info(LOG, mergeFanout + " files are being merged to an output file " + 
outputPath.getName());
       long mergeStartTime = System.currentTimeMillis();
-      final RawFileAppender output = new RawFileAppender(context.getConf(), 
inSchema, meta, outputPath);
+      final DirectRawFileWriter output = new 
DirectRawFileWriter(context.getConf(), inSchema, meta, outputPath);
       output.init();
       final Scanner merger = createKWayMerger(inputFiles, startIdx, 
mergeFanout);
       merger.init();
@@ -479,255 +466,46 @@ public class ExternalSortExec extends SortExec {
   }
 
   private Scanner getFileScanner(Path path) throws IOException {
-    return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, 
path);
+    String extension = FileUtil.getExtension(path.getName());
+    if (extension.equals(RawFile.FILE_EXTENSION)) {
+      return new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), 
meta, path);
+    } else if (extension.equalsIgnoreCase(DirectRawFileWriter.FILE_EXTENSION)) 
{
+      return new DirectRawFileScanner(context.getConf(), plan.getInSchema(), 
meta, path);
+    } else {
+      throw new IllegalStateException("Unknown File Extension: " + path);
+    }
   }
 
   private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, 
final int num) throws IOException {
-    final Scanner [] sources = new Scanner[num];
+    boolean tupleInMemory = tupleBlock.rows() > 0;
+
+    List<Scanner> scannerList = Lists.newArrayList();
+
+    // if tuples are still the in-memory block, the in-memory tuples will be 
included in merge phase.
+    if (tupleInMemory) {
+      scannerList.add(new MemTableScanner(sortedTuples, bytesOfLatestChunk));
+    }
+
     for (int i = 0; i < num; i++) {
-      sources[i] = getFileScanner(inputs.get(startChunkId + i));
+      scannerList.add(getFileScanner(inputs.get(startChunkId + i)));
     }
+    Scanner [] sources = scannerList.toArray(new Scanner[scannerList.size()]);
 
-    return createKWayMergerInternal(sources, 0, num);
+    return createKWayMergerInternal(sources, 0, sources.length);
   }
 
   private Scanner createKWayMergerInternal(final Scanner [] sources, final int 
startIdx, final int num)
       throws IOException {
     if (num > 1) {
       final int mid = (int) Math.ceil((float)num / 2);
-      return new PairWiseMerger(
+      return new PairWiseMerger(inSchema,
           createKWayMergerInternal(sources, startIdx, mid),
-          createKWayMergerInternal(sources, startIdx + mid, num - mid));
+          createKWayMergerInternal(sources, startIdx + mid, num - mid), 
getComparator());
     } else {
       return sources[startIdx];
     }
   }
 
-  private class MemTableScanner implements Scanner {
-    Iterator<Tuple> iterator;
-
-    // for input stats
-    float scannerProgress;
-    int numRecords;
-    int totalRecords;
-    TableStats scannerTableStats;
-
-    @Override
-    public void init() throws IOException {
-      iterator = inMemoryTable.iterator();
-
-      totalRecords = inMemoryTable.size();
-      scannerProgress = 0.0f;
-      numRecords = 0;
-
-      // it will be returned as the final stats
-      scannerTableStats = new TableStats();
-      scannerTableStats.setNumBytes(sortAndStoredBytes);
-      scannerTableStats.setReadBytes(sortAndStoredBytes);
-      scannerTableStats.setNumRows(totalRecords);
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      if (iterator.hasNext()) {
-        numRecords++;
-        return iterator.next();
-      } else {
-        return null;
-      }
-    }
-
-    @Override
-    public void reset() throws IOException {
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      iterator = null;
-      scannerProgress = 1.0f;
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public void setTarget(Column[] targets) {
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public void setSearchCondition(Object expr) {
-    }
-
-    @Override
-    public boolean isSplittable() {
-      return false;
-    }
-
-    @Override
-    public Schema getSchema() {
-      return null;
-    }
-
-    @Override
-    public float getProgress() {
-      if (iterator != null && numRecords > 0) {
-        return (float)numRecords / (float)totalRecords;
-
-      } else { // if an input is empty
-        return scannerProgress;
-      }
-    }
-
-    @Override
-    public TableStats getInputStats() {
-      return scannerTableStats;
-    }
-  }
-
-  /**
-   * Two-way merger scanner that reads two input sources and outputs one 
output tuples sorted in some order.
-   */
-  private class PairWiseMerger implements Scanner {
-    private Scanner leftScan;
-    private Scanner rightScan;
-
-    private Tuple leftTuple;
-    private Tuple rightTuple;
-
-    private final Comparator<Tuple> comparator = getComparator();
-
-    private float mergerProgress;
-    private TableStats mergerInputStats;
-
-    public PairWiseMerger(Scanner leftScanner, Scanner rightScanner) throws 
IOException {
-      this.leftScan = leftScanner;
-      this.rightScan = rightScanner;
-    }
-
-    @Override
-    public void init() throws IOException {
-      leftScan.init();
-      rightScan.init();
-
-      leftTuple = leftScan.next();
-      rightTuple = rightScan.next();
-
-      mergerInputStats = new TableStats();
-      mergerProgress = 0.0f;
-    }
-
-    public Tuple next() throws IOException {
-      Tuple outTuple;
-      if (leftTuple != null && rightTuple != null) {
-        if (comparator.compare(leftTuple, rightTuple) < 0) {
-          outTuple = leftTuple;
-          leftTuple = leftScan.next();
-        } else {
-          outTuple = rightTuple;
-          rightTuple = rightScan.next();
-        }
-        return outTuple;
-      }
-
-      if (leftTuple == null) {
-        outTuple = rightTuple;
-        rightTuple = rightScan.next();
-      } else {
-        outTuple = leftTuple;
-        leftTuple = leftScan.next();
-      }
-      return outTuple;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      leftScan.reset();
-      rightScan.reset();
-      init();
-    }
-
-    public void close() throws IOException {
-      IOUtils.cleanup(LOG, leftScan, rightScan);
-      getInputStats();
-      leftScan = null;
-      rightScan = null;
-      mergerProgress = 1.0f;
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public void setTarget(Column[] targets) {
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public void setSearchCondition(Object expr) {
-    }
-
-    @Override
-    public boolean isSplittable() {
-      return false;
-    }
-
-    @Override
-    public Schema getSchema() {
-      return inSchema;
-    }
-
-    @Override
-    public float getProgress() {
-      if (leftScan == null) {
-        return mergerProgress;
-      }
-      return leftScan.getProgress() * 0.5f + rightScan.getProgress() * 0.5f;
-    }
-
-    @Override
-    public TableStats getInputStats() {
-      if (leftScan == null) {
-        return mergerInputStats;
-      }
-      TableStats leftInputStats = leftScan.getInputStats();
-      if (mergerInputStats == null) {
-        mergerInputStats = new TableStats();
-      }
-      mergerInputStats.setNumBytes(0);
-      mergerInputStats.setReadBytes(0);
-      mergerInputStats.setNumRows(0);
-
-      if (leftInputStats != null) {
-        mergerInputStats.setNumBytes(leftInputStats.getNumBytes());
-        mergerInputStats.setReadBytes(leftInputStats.getReadBytes());
-        mergerInputStats.setNumRows(leftInputStats.getNumRows());
-      }
-
-      TableStats rightInputStats = rightScan.getInputStats();
-      if (rightInputStats != null) {
-        mergerInputStats.setNumBytes(mergerInputStats.getNumBytes() + 
rightInputStats.getNumBytes());
-        mergerInputStats.setReadBytes(mergerInputStats.getReadBytes() + 
rightInputStats.getReadBytes());
-        mergerInputStats.setNumRows(mergerInputStats.getNumRows() + 
rightInputStats.getNumRows());
-      }
-
-      return mergerInputStats;
-    }
-  }
-
   @Override
   public void close() throws IOException {
     if (result != null) {
@@ -746,9 +524,9 @@ public class ExternalSortExec extends SortExec {
       }
     }
 
-    if(inMemoryTable != null){
-      inMemoryTable.clear();
-      inMemoryTable = null;
+    if(tupleBlock != null){
+      tupleBlock.release();
+      tupleBlock = null;
     }
 
     if(executorService != null){
@@ -764,8 +542,9 @@ public class ExternalSortExec extends SortExec {
   public void rescan() throws IOException {
     if (result != null) {
       result.reset();
+    } else {
+      super.rescan();
     }
-    super.rescan();
     progress = 0.5f;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 9dabbb3..a460345 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -24,7 +24,7 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 426a7a1..4c8dc18 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -24,7 +24,7 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index b752db5..e3dde1f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -28,7 +28,7 @@ import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
index a59f8d9..4701df3 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
@@ -73,11 +73,11 @@ public class JoinTupleComparator implements 
Comparator<Tuple> {
         inner = innerTuple.get(innerSortKeyIds[i]);
       }
 
-      if (outer instanceof NullDatum || inner instanceof NullDatum) {
+      if (outer.isNull() || inner.isNull()) {
         if (!outer.equals(inner)) {
-          if (outer instanceof NullDatum) {
+          if (outer.isNull()) {
             compVal = 1;
-          } else if (inner instanceof NullDatum) {
+          } else if (inner.isNull()) {
             compVal = -1;
           }
         } else {

Reply via email to