This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 93920b1  Unify data type conversion and formatting (#6728)
93920b1 is described below

commit 93920b17f9dc06c4d9b9baeb6006ab780afed8dd
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Apr 1 21:32:31 2021 -0700

    Unify data type conversion and formatting (#6728)
    
    Unify the logic of data type conversion and formatting into 
`PinotDataType`, `DataType`, `ColumnDataType`.
    - `PinotDataType` is used for type conversion
    - `DataType` is used in schema to represent the source data type (only have 
single-valued type)
    - `ColumnDataType` is used in `DataTable` and `ResultTable` to represent 
the result type
    
    `BYTES` type is stored as `byte[]` externally but as `ByteArray` internally 
for hashing and sorting purpose. This PR handles the conversion of `BYTES` type 
(also new types to be added in the future) so that the the result can be passed 
to the UDF for post-aggregation calculations.
    Also adding primitive array types to `PinotDataType`.
    
    We should consider merging `DataType` and `ColumnDataType` in the future.
---
 .../pinot/common/function/FunctionInvoker.java     |  19 +-
 .../pinot/common/function/FunctionUtils.java       |  24 +-
 .../common/function/scalar/StringFunctions.java    |   2 +-
 .../org/apache/pinot/common/utils/DataSchema.java  | 181 ++++++++++++-
 .../apache/pinot/common/utils/PinotDataType.java   | 279 +++++++++++++++------
 .../pinot/common/utils/request/RequestUtils.java   |  48 +---
 .../pinot/common/utils/PinotDataTypeTest.java      |   9 +-
 .../recordtransformer/DataTypeTransformer.java     |   2 +-
 .../JsonExtractScalarTransformFunction.java        |  68 ++---
 .../function/ScalarTransformFunctionWrapper.java   |  37 +--
 .../postaggregation/PostAggregationFunction.java   |  14 +-
 .../query/pruner/ColumnValueSegmentPruner.java     |  21 +-
 .../query/reduce/AggregationDataTableReducer.java  |  27 +-
 .../query/reduce/DistinctDataTableReducer.java     |  14 +-
 .../core/query/reduce/GroupByDataTableReducer.java |  30 ++-
 .../core/query/reduce/HavingFilterHandler.java     |  26 +-
 .../query/selection/SelectionOperatorService.java  |  40 +--
 .../query/selection/SelectionOperatorUtils.java    |  91 +------
 .../core/query/reduce/HavingFilterHandlerTest.java |  22 +-
 .../apache/pinot/queries/StUnionQueriesTest.java   |  59 +++--
 .../pinot/queries/SumPrecisionQueriesTest.java     |  72 ++++--
 .../java/org/apache/pinot/spi/data/FieldSpec.java  | 116 ++++++---
 22 files changed, 738 insertions(+), 463 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
index aec453c..b185d26 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
@@ -23,7 +23,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.Arrays;
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pinot.common.utils.PinotDataType;
 
 
@@ -109,23 +108,7 @@ public class FunctionInvoker {
       PinotDataType argumentType = 
FunctionUtils.getArgumentType(argumentClass);
       Preconditions.checkArgument(parameterType != null && argumentType != 
null,
           "Cannot convert value from class: %s to class: %s", argumentClass, 
parameterClass);
-      Object convertedArgument = parameterType.convert(argument, argumentType);
-      // For primitive array parameter, convert the argument from Object array 
to primitive array
-      switch (parameterType) {
-        case INTEGER_ARRAY:
-          convertedArgument = ArrayUtils.toPrimitive((Integer[]) 
convertedArgument);
-          break;
-        case LONG_ARRAY:
-          convertedArgument = ArrayUtils.toPrimitive((Long[]) 
convertedArgument);
-          break;
-        case FLOAT_ARRAY:
-          convertedArgument = ArrayUtils.toPrimitive((Float[]) 
convertedArgument);
-          break;
-        case DOUBLE_ARRAY:
-          convertedArgument = ArrayUtils.toPrimitive((Double[]) 
convertedArgument);
-          break;
-      }
-      arguments[i] = convertedArgument;
+      arguments[i] = parameterType.convert(argument, argumentType);
     }
   }
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
index 1b04ff7..000e960 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
@@ -42,10 +42,10 @@ public class FunctionUtils {
     put(Double.class, PinotDataType.DOUBLE);
     put(String.class, PinotDataType.STRING);
     put(byte[].class, PinotDataType.BYTES);
-    put(int[].class, PinotDataType.INTEGER_ARRAY);
-    put(long[].class, PinotDataType.LONG_ARRAY);
-    put(float[].class, PinotDataType.FLOAT_ARRAY);
-    put(double[].class, PinotDataType.DOUBLE_ARRAY);
+    put(int[].class, PinotDataType.PRIMITIVE_INT_ARRAY);
+    put(long[].class, PinotDataType.PRIMITIVE_LONG_ARRAY);
+    put(float[].class, PinotDataType.PRIMITIVE_FLOAT_ARRAY);
+    put(double[].class, PinotDataType.PRIMITIVE_DOUBLE_ARRAY);
     put(String[].class, PinotDataType.STRING_ARRAY);
   }};
 
@@ -61,13 +61,13 @@ public class FunctionUtils {
     put(Double.class, PinotDataType.DOUBLE);
     put(String.class, PinotDataType.STRING);
     put(byte[].class, PinotDataType.BYTES);
-    put(int[].class, PinotDataType.INTEGER_ARRAY);
+    put(int[].class, PinotDataType.PRIMITIVE_INT_ARRAY);
     put(Integer[].class, PinotDataType.INTEGER_ARRAY);
-    put(long[].class, PinotDataType.LONG_ARRAY);
+    put(long[].class, PinotDataType.PRIMITIVE_LONG_ARRAY);
     put(Long[].class, PinotDataType.LONG_ARRAY);
-    put(float[].class, PinotDataType.FLOAT_ARRAY);
+    put(float[].class, PinotDataType.PRIMITIVE_FLOAT_ARRAY);
     put(Float[].class, PinotDataType.FLOAT_ARRAY);
-    put(double[].class, PinotDataType.DOUBLE_ARRAY);
+    put(double[].class, PinotDataType.PRIMITIVE_DOUBLE_ARRAY);
     put(Double[].class, PinotDataType.DOUBLE_ARRAY);
     put(String[].class, PinotDataType.STRING_ARRAY);
   }};
@@ -84,13 +84,9 @@ public class FunctionUtils {
     put(String.class, DataType.STRING);
     put(byte[].class, DataType.BYTES);
     put(int[].class, DataType.INT);
-    put(Integer[].class, DataType.INT);
     put(long[].class, DataType.LONG);
-    put(Long[].class, DataType.LONG);
     put(float[].class, DataType.FLOAT);
-    put(Float[].class, DataType.FLOAT);
     put(double[].class, DataType.DOUBLE);
-    put(Double[].class, DataType.DOUBLE);
     put(String[].class, DataType.STRING);
   }};
 
@@ -106,13 +102,9 @@ public class FunctionUtils {
     put(String.class, ColumnDataType.STRING);
     put(byte[].class, ColumnDataType.BYTES);
     put(int[].class, ColumnDataType.INT_ARRAY);
-    put(Integer[].class, ColumnDataType.INT_ARRAY);
     put(long[].class, ColumnDataType.LONG_ARRAY);
-    put(Long[].class, ColumnDataType.LONG_ARRAY);
     put(float[].class, ColumnDataType.FLOAT_ARRAY);
-    put(Float[].class, ColumnDataType.FLOAT_ARRAY);
     put(double[].class, ColumnDataType.DOUBLE_ARRAY);
-    put(Double[].class, ColumnDataType.DOUBLE_ARRAY);
     put(String[].class, ColumnDataType.STRING_ARRAY);
   }};
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
index eded7cb..6cbc460 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
@@ -348,7 +348,7 @@ public class StringFunctions {
    * @return returns true if substring present in main string else false.
    */
   @ScalarFunction
-  public static Boolean contains(String input, String substring) {
+  public static boolean contains(String input, String substring) {
     return input.contains(substring);
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 8ba9ca4..68d1301 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -23,8 +23,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.EqualityUtils;
 
 
@@ -175,7 +178,7 @@ public class DataSchema {
     return new DataSchema(columnNames, columnDataTypes);
   }
 
-  @SuppressWarnings("CloneDoesntCallSuperClone")
+  @SuppressWarnings("MethodDoesntCallSuperMethod")
   @Override
   public DataSchema clone() {
     return new DataSchema(_columnNames.clone(), _columnDataTypes.clone());
@@ -241,11 +244,179 @@ public class DataSchema {
           this.isNumberArray() && anotherColumnDataType.isNumberArray());
     }
 
-    public static ColumnDataType fromDataType(FieldSpec.DataType dataType, 
boolean isSingleValue) {
+    public DataType toDataType() {
+      switch (this) {
+        case INT:
+          return DataType.INT;
+        case LONG:
+          return DataType.LONG;
+        case FLOAT:
+          return DataType.FLOAT;
+        case DOUBLE:
+          return DataType.DOUBLE;
+        case STRING:
+          return DataType.STRING;
+        case BYTES:
+          return DataType.BYTES;
+        default:
+          throw new IllegalStateException(String.format("Cannot convert 
ColumnDataType: %s to DataType", this));
+      }
+    }
+
+    /**
+     * Converts the given internal value to the type for external use (e.g. as 
UDF argument). The given value should be
+     * compatible with the type.
+     */
+    public Serializable convert(Object value) {
+      switch (this) {
+        case INT:
+          return ((Number) value).intValue();
+        case LONG:
+          return ((Number) value).longValue();
+        case FLOAT:
+          return ((Number) value).floatValue();
+        case DOUBLE:
+          return ((Number) value).doubleValue();
+        case STRING:
+          return value.toString();
+        case BYTES:
+          return ((ByteArray) value).getBytes();
+        case INT_ARRAY:
+          return (int[]) value;
+        case LONG_ARRAY:
+          if (value instanceof long[]) {
+            return (long[]) value;
+          } else {
+            int[] intValues = (int[]) value;
+            int length = intValues.length;
+            long[] longValues = new long[length];
+            for (int i = 0; i < length; i++) {
+              longValues[i] = intValues[i];
+            }
+            return longValues;
+          }
+        case FLOAT_ARRAY:
+          return (float[]) value;
+        case DOUBLE_ARRAY:
+          if (value instanceof double[]) {
+            return (double[]) value;
+          } else if (value instanceof int[]) {
+            int[] intValues = (int[]) value;
+            int length = intValues.length;
+            double[] doubleValues = new double[length];
+            for (int i = 0; i < length; i++) {
+              doubleValues[i] = intValues[i];
+            }
+            return doubleValues;
+          } else if (value instanceof long[]) {
+            long[] longValues = (long[]) value;
+            int length = longValues.length;
+            double[] doubleValues = new double[length];
+            for (int i = 0; i < length; i++) {
+              doubleValues[i] = longValues[i];
+            }
+            return doubleValues;
+          } else {
+            float[] floatValues = (float[]) value;
+            int length = floatValues.length;
+            double[] doubleValues = new double[length];
+            for (int i = 0; i < length; i++) {
+              doubleValues[i] = floatValues[i];
+            }
+            return doubleValues;
+          }
+        case STRING_ARRAY:
+          return (String[]) value;
+        default:
+          throw new IllegalStateException(String.format("Cannot convert: '%s' 
to type: %s", value, this));
+      }
+    }
+
+    /**
+     * Formats the value to human-readable format based on the type to be used 
in the query response.
+     */
+    public Serializable format(Object value) {
+      switch (this) {
+        case BYTES:
+          return BytesUtils.toHexString((byte[]) value);
+        default:
+          return (Serializable) value;
+      }
+    }
+
+    /**
+     * Equivalent to {@link #convert(Object)} and {@link #format(Object)} with 
a single switch statement.
+     */
+    public Serializable convertAndFormat(Object value) {
+      switch (this) {
+        case INT:
+          return ((Number) value).intValue();
+        case LONG:
+          return ((Number) value).longValue();
+        case FLOAT:
+          return ((Number) value).floatValue();
+        case DOUBLE:
+          return ((Number) value).doubleValue();
+        case STRING:
+          return value.toString();
+        case BYTES:
+          return ((ByteArray) value).toHexString();
+        case INT_ARRAY:
+          return (int[]) value;
+        case LONG_ARRAY:
+          if (value instanceof long[]) {
+            return (long[]) value;
+          } else {
+            int[] intValues = (int[]) value;
+            int length = intValues.length;
+            long[] longValues = new long[length];
+            for (int i = 0; i < length; i++) {
+              longValues[i] = intValues[i];
+            }
+            return longValues;
+          }
+        case FLOAT_ARRAY:
+          return (float[]) value;
+        case DOUBLE_ARRAY:
+          if (value instanceof double[]) {
+            return (double[]) value;
+          } else if (value instanceof int[]) {
+            int[] intValues = (int[]) value;
+            int length = intValues.length;
+            double[] doubleValues = new double[length];
+            for (int i = 0; i < length; i++) {
+              doubleValues[i] = intValues[i];
+            }
+            return doubleValues;
+          } else if (value instanceof long[]) {
+            long[] longValues = (long[]) value;
+            int length = longValues.length;
+            double[] doubleValues = new double[length];
+            for (int i = 0; i < length; i++) {
+              doubleValues[i] = longValues[i];
+            }
+            return doubleValues;
+          } else {
+            float[] floatValues = (float[]) value;
+            int length = floatValues.length;
+            double[] doubleValues = new double[length];
+            for (int i = 0; i < length; i++) {
+              doubleValues[i] = floatValues[i];
+            }
+            return doubleValues;
+          }
+        case STRING_ARRAY:
+          return (String[]) value;
+        default:
+          throw new IllegalStateException(String.format("Cannot convert and 
format: '%s' to type: %s", value, this));
+      }
+    }
+
+    public static ColumnDataType fromDataType(DataType dataType, boolean 
isSingleValue) {
       return isSingleValue ? fromDataTypeSV(dataType) : 
fromDataTypeMV(dataType);
     }
 
-    public static ColumnDataType fromDataTypeSV(FieldSpec.DataType dataType) {
+    public static ColumnDataType fromDataTypeSV(DataType dataType) {
       switch (dataType) {
         case INT:
           return INT;
@@ -264,7 +435,7 @@ public class DataSchema {
       }
     }
 
-    public static ColumnDataType fromDataTypeMV(FieldSpec.DataType dataType) {
+    public static ColumnDataType fromDataTypeMV(DataType dataType) {
       switch (dataType) {
         case INT:
           return INT_ARRAY;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 7325318..95cdf81 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BytesUtils;
 
 
@@ -28,7 +29,7 @@ import org.apache.pinot.spi.utils.BytesUtils;
  *  The <code>PinotDataType</code> enum represents the data type of a value in 
a row from recordReader and provides
  *  utility methods to convert value across types if applicable.
  *  <p>We don't use <code>PinotDataType</code> to maintain type information, 
but use it to help organize the data and
- *  use {@link FieldSpec.DataType} to maintain type information separately 
across various readers.
+ *  use {@link DataType} to maintain type information separately across 
various readers.
  *  <p>NOTE:
  *  <ul>
  *    <li>We will silently lose information if a conversion causes us to do so 
(e.g. DOUBLE to INT)</li>
@@ -40,22 +41,22 @@ public enum PinotDataType {
 
   BOOLEAN {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return ((Boolean) value) ? 1 : 0;
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return ((Boolean) value) ? 1L : 0L;
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return ((Boolean) value) ? 1f : 0f;
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return ((Boolean) value) ? 1d : 0d;
     }
 
@@ -72,22 +73,22 @@ public enum PinotDataType {
 
   BYTE {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return ((Byte) value).intValue();
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return ((Byte) value).longValue();
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return ((Byte) value).floatValue();
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return ((Byte) value).doubleValue();
     }
 
@@ -104,22 +105,22 @@ public enum PinotDataType {
 
   CHARACTER {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return (int) ((Character) value);
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return (long) ((Character) value);
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return (float) ((Character) value);
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return (double) ((Character) value);
     }
 
@@ -136,22 +137,22 @@ public enum PinotDataType {
 
   SHORT {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return ((Short) value).intValue();
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return ((Short) value).longValue();
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return ((Short) value).floatValue();
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return ((Short) value).doubleValue();
     }
 
@@ -168,22 +169,22 @@ public enum PinotDataType {
 
   INTEGER {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return (Integer) value;
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return ((Integer) value).longValue();
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return ((Integer) value).floatValue();
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return ((Integer) value).doubleValue();
     }
 
@@ -199,28 +200,28 @@ public enum PinotDataType {
 
     @Override
     public Integer convert(Object value, PinotDataType sourceType) {
-      return sourceType.toInteger(value);
+      return sourceType.toInt(value);
     }
   },
 
   LONG {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return ((Long) value).intValue();
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return (Long) value;
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return ((Long) value).floatValue();
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return ((Long) value).doubleValue();
     }
 
@@ -242,22 +243,22 @@ public enum PinotDataType {
 
   FLOAT {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return ((Float) value).intValue();
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return ((Float) value).longValue();
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return (Float) value;
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return ((Float) value).doubleValue();
     }
 
@@ -279,22 +280,22 @@ public enum PinotDataType {
 
   DOUBLE {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return ((Double) value).intValue();
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return ((Double) value).longValue();
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return ((Double) value).floatValue();
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return (Double) value;
     }
 
@@ -316,35 +317,35 @@ public enum PinotDataType {
 
   STRING {
     @Override
-    public Integer toInteger(Object value) {
-      return Integer.valueOf(((String) value).trim());
+    public int toInt(Object value) {
+      return Integer.parseInt(value.toString().trim());
     }
 
     @Override
-    public Long toLong(Object value) {
-      return Long.valueOf(((String) value).trim());
+    public long toLong(Object value) {
+      return Long.parseLong(value.toString().trim());
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       // NOTE: No need to trim here because Float.valueOf() will trim the 
string
-      return Float.valueOf((String) value);
+      return Float.parseFloat(value.toString());
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       // NOTE: No need to trim here because Double.valueOf() will trim the 
string
-      return Double.valueOf((String) value);
+      return Double.parseDouble(value.toString());
     }
 
     @Override
     public String toString(Object value) {
-      return (String) value;
+      return value.toString();
     }
 
     @Override
     public byte[] toBytes(Object value) {
-      return BytesUtils.toBytes(((String) value).trim());
+      return BytesUtils.toBytes(value.toString().trim());
     }
 
     @Override
@@ -355,22 +356,22 @@ public enum PinotDataType {
 
   BYTES {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       throw new UnsupportedOperationException("Cannot convert value from BYTES 
to INTEGER");
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       throw new UnsupportedOperationException("Cannot convert value from BYTES 
to LONG");
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       throw new UnsupportedOperationException("Cannot convert value from BYTES 
to FLOAT");
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       throw new UnsupportedOperationException("Cannot convert value from BYTES 
to DOUBLE");
     }
 
@@ -392,22 +393,22 @@ public enum PinotDataType {
 
   OBJECT {
     @Override
-    public Integer toInteger(Object value) {
+    public int toInt(Object value) {
       return ((Number) value).intValue();
     }
 
     @Override
-    public Long toLong(Object value) {
+    public long toLong(Object value) {
       return ((Number) value).longValue();
     }
 
     @Override
-    public Float toFloat(Object value) {
+    public float toFloat(Object value) {
       return ((Number) value).floatValue();
     }
 
     @Override
-    public Double toDouble(Object value) {
+    public double toDouble(Object value) {
       return ((Number) value).doubleValue();
     }
 
@@ -439,6 +440,20 @@ public enum PinotDataType {
 
   SHORT_ARRAY,
 
+  /*
+    NOTE:
+      Primitive array is used in query execution, query response, scalar 
function arguments and return values.
+      Object array is used in GenericRow for data ingestion.
+      We need to keep them separately because they cannot automatically cast 
to the other type.
+   */
+
+  PRIMITIVE_INT_ARRAY {
+    @Override
+    public int[] convert(Object value, PinotDataType sourceType) {
+      return sourceType.toPrimitiveIntArray(value);
+    }
+  },
+
   INTEGER_ARRAY {
     @Override
     public Integer[] convert(Object value, PinotDataType sourceType) {
@@ -446,6 +461,13 @@ public enum PinotDataType {
     }
   },
 
+  PRIMITIVE_LONG_ARRAY {
+    @Override
+    public long[] convert(Object value, PinotDataType sourceType) {
+      return sourceType.toPrimitiveLongArray(value);
+    }
+  },
+
   LONG_ARRAY {
     @Override
     public Long[] convert(Object value, PinotDataType sourceType) {
@@ -453,6 +475,13 @@ public enum PinotDataType {
     }
   },
 
+  PRIMITIVE_FLOAT_ARRAY {
+    @Override
+    public float[] convert(Object value, PinotDataType sourceType) {
+      return sourceType.toPrimitiveFloatArray(value);
+    }
+  },
+
   FLOAT_ARRAY {
     @Override
     public Float[] convert(Object value, PinotDataType sourceType) {
@@ -460,6 +489,13 @@ public enum PinotDataType {
     }
   },
 
+  PRIMITIVE_DOUBLE_ARRAY {
+    @Override
+    public double[] convert(Object value, PinotDataType sourceType) {
+      return sourceType.toPrimitiveDoubleArray(value);
+    }
+  },
+
   DOUBLE_ARRAY {
     @Override
     public Double[] convert(Object value, PinotDataType sourceType) {
@@ -477,49 +513,91 @@ public enum PinotDataType {
   OBJECT_ARRAY;
 
   /**
-   * NOTE: override toInteger(), toLong(), toFloat(), toDouble(), toString() 
and toBytes() for single-value types.
+   * NOTE: override toInt(), toLong(), toFloat(), toDouble(), toString() and 
toBytes() for single-value types.
    */
 
-  public Integer toInteger(Object value) {
-    return getSingleValueType().toInteger(((Object[]) value)[0]);
+  public int toInt(Object value) {
+    return getSingleValueType().toInt(toObjectArray(value)[0]);
   }
 
-  public Long toLong(Object value) {
-    return getSingleValueType().toLong(((Object[]) value)[0]);
+  public long toLong(Object value) {
+    return getSingleValueType().toLong(toObjectArray(value)[0]);
   }
 
-  public Float toFloat(Object value) {
-    return getSingleValueType().toFloat(((Object[]) value)[0]);
+  public float toFloat(Object value) {
+    return getSingleValueType().toFloat(toObjectArray(value)[0]);
   }
 
-  public Double toDouble(Object value) {
-    return getSingleValueType().toDouble(((Object[]) value)[0]);
+  public double toDouble(Object value) {
+    return getSingleValueType().toDouble(toObjectArray(value)[0]);
   }
 
   public String toString(Object value) {
-    return getSingleValueType().toString(((Object[]) value)[0]);
+    return getSingleValueType().toString(toObjectArray(value)[0]);
   }
 
   public byte[] toBytes(Object value) {
-    return getSingleValueType().toBytes(((Object[]) value)[0]);
+    return getSingleValueType().toBytes(toObjectArray(value)[0]);
+  }
+
+  public int[] toPrimitiveIntArray(Object value) {
+    if (value instanceof int[]) {
+      return (int[]) value;
+    }
+    if (isSingleValue()) {
+      return new int[]{toInt(value)};
+    } else {
+      Object[] valueArray = toObjectArray(value);
+      int length = valueArray.length;
+      int[] intArray = new int[length];
+      PinotDataType singleValueType = getSingleValueType();
+      for (int i = 0; i < length; i++) {
+        intArray[i] = singleValueType.toInt(valueArray[i]);
+      }
+      return intArray;
+    }
   }
 
   public Integer[] toIntegerArray(Object value) {
+    if (value instanceof Integer[]) {
+      return (Integer[]) value;
+    }
     if (isSingleValue()) {
-      return new Integer[]{toInteger(value)};
+      return new Integer[]{toInt(value)};
     } else {
       Object[] valueArray = toObjectArray(value);
       int length = valueArray.length;
       Integer[] integerArray = new Integer[length];
       PinotDataType singleValueType = getSingleValueType();
       for (int i = 0; i < length; i++) {
-        integerArray[i] = singleValueType.toInteger(valueArray[i]);
+        integerArray[i] = singleValueType.toInt(valueArray[i]);
       }
       return integerArray;
     }
   }
 
+  public long[] toPrimitiveLongArray(Object value) {
+    if (value instanceof long[]) {
+      return (long[]) value;
+    }
+    if (isSingleValue()) {
+      return new long[]{toLong(value)};
+    } else {
+      Object[] valueArray = toObjectArray(value);
+      int length = valueArray.length;
+      long[] longArray = new long[length];
+      PinotDataType singleValueType = getSingleValueType();
+      for (int i = 0; i < length; i++) {
+        longArray[i] = singleValueType.toLong(valueArray[i]);
+      }
+      return longArray;
+    }
+  }
+
   public Long[] toLongArray(Object value) {
+    if (value instanceof Long[]) {
+      return (Long[]) value;
+    }
     if (isSingleValue()) {
       return new Long[]{toLong(value)};
     } else {
@@ -534,7 +612,28 @@ public enum PinotDataType {
     }
   }
 
+  public float[] toPrimitiveFloatArray(Object value) {
+    if (value instanceof float[]) {
+      return (float[]) value;
+    }
+    if (isSingleValue()) {
+      return new float[]{toFloat(value)};
+    } else {
+      Object[] valueArray = toObjectArray(value);
+      int length = valueArray.length;
+      float[] floatArray = new float[length];
+      PinotDataType singleValueType = getSingleValueType();
+      for (int i = 0; i < length; i++) {
+        floatArray[i] = singleValueType.toFloat(valueArray[i]);
+      }
+      return floatArray;
+    }
+  }
+
   public Float[] toFloatArray(Object value) {
+    if (value instanceof Float[]) {
+      return (Float[]) value;
+    }
     if (isSingleValue()) {
       return new Float[]{toFloat(value)};
     } else {
@@ -549,7 +648,28 @@ public enum PinotDataType {
     }
   }
 
+  public double[] toPrimitiveDoubleArray(Object value) {
+    if (value instanceof double[]) {
+      return (double[]) value;
+    }
+    if (isSingleValue()) {
+      return new double[]{toDouble(value)};
+    } else {
+      Object[] valueArray = toObjectArray(value);
+      int length = valueArray.length;
+      double[] doubleArray = new double[length];
+      PinotDataType singleValueType = getSingleValueType();
+      for (int i = 0; i < length; i++) {
+        doubleArray[i] = singleValueType.toDouble(valueArray[i]);
+      }
+      return doubleArray;
+    }
+  }
+
   public Double[] toDoubleArray(Object value) {
+    if (value instanceof Double[]) {
+      return (Double[]) value;
+    }
     if (isSingleValue()) {
       return new Double[]{toDouble(value)};
     } else {
@@ -565,6 +685,9 @@ public enum PinotDataType {
   }
 
   public String[] toStringArray(Object value) {
+    if (value instanceof String[]) {
+      return (String[]) value;
+    }
     if (isSingleValue()) {
       return new String[]{toString(value)};
     } else {
@@ -616,12 +739,16 @@ public enum PinotDataType {
         return CHARACTER;
       case SHORT_ARRAY:
         return SHORT;
+      case PRIMITIVE_INT_ARRAY:
       case INTEGER_ARRAY:
         return INTEGER;
+      case PRIMITIVE_LONG_ARRAY:
       case LONG_ARRAY:
         return LONG;
+      case PRIMITIVE_FLOAT_ARRAY:
       case FLOAT_ARRAY:
         return FLOAT;
+      case PRIMITIVE_DOUBLE_ARRAY:
       case DOUBLE_ARRAY:
         return DOUBLE;
       case STRING_ARRAY:
@@ -633,8 +760,12 @@ public enum PinotDataType {
     }
   }
 
-  public static PinotDataType getPinotDataType(FieldSpec fieldSpec) {
-    FieldSpec.DataType dataType = fieldSpec.getDataType();
+  /**
+   * Returns the {@link PinotDataType} for the given {@link FieldSpec} for 
data ingestion purpose. Returns object array
+   * type for multi-valued types.
+   */
+  public static PinotDataType getPinotDataTypeForIngestion(FieldSpec 
fieldSpec) {
+    DataType dataType = fieldSpec.getDataType();
     switch (dataType) {
       case INT:
         return fieldSpec.isSingleValueField() ? PinotDataType.INTEGER : 
PinotDataType.INTEGER_ARRAY;
@@ -658,7 +789,11 @@ public enum PinotDataType {
     }
   }
 
-  public static PinotDataType getPinotDataType(ColumnDataType columnDataType) {
+  /**
+   * Returns the {@link PinotDataType} for the given {@link ColumnDataType} 
for query execution purpose. Returns
+   * primitive array type for multi-valued types.
+   */
+  public static PinotDataType getPinotDataTypeForExecution(ColumnDataType 
columnDataType) {
     switch (columnDataType) {
       case INT:
         return INTEGER;
@@ -673,13 +808,13 @@ public enum PinotDataType {
       case BYTES:
         return BYTES;
       case INT_ARRAY:
-        return INTEGER_ARRAY;
+        return PRIMITIVE_INT_ARRAY;
       case LONG_ARRAY:
-        return LONG_ARRAY;
+        return PRIMITIVE_LONG_ARRAY;
       case FLOAT_ARRAY:
-        return FLOAT_ARRAY;
+        return PRIMITIVE_FLOAT_ARRAY;
       case DOUBLE_ARRAY:
-        return DOUBLE_ARRAY;
+        return PRIMITIVE_DOUBLE_ARRAY;
       case STRING_ARRAY:
         return STRING_ARRAY;
       default:
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index 6297c01..da4f53a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -43,7 +43,6 @@ import org.apache.pinot.pql.parsers.pql2.ast.LiteralAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.PredicateAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.StringLiteralAstNode;
 import org.apache.pinot.spi.utils.BytesUtils;
-import org.apache.pinot.sql.parsers.SqlCompilationException;
 
 
 public class RequestUtils {
@@ -127,62 +126,41 @@ public class RequestUtils {
     return expression;
   }
 
-  public static Expression getLiteralExpression(String value) {
+  public static Expression getLiteralExpression(long value) {
     Expression expression = createNewLiteralExpression();
-    expression.getLiteral().setStringValue(value);
+    expression.getLiteral().setLongValue(value);
     return expression;
   }
 
-  public static Expression getLiteralExpression(byte[] value) {
+  public static Expression getLiteralExpression(double value) {
     Expression expression = createNewLiteralExpression();
-    expression.getLiteral().setStringValue(BytesUtils.toHexString(value));
+    expression.getLiteral().setDoubleValue(value);
     return expression;
   }
 
-  public static Expression getLiteralExpression(Integer value) {
-    return getLiteralExpression(value.longValue());
-  }
-
-  public static Expression getLiteralExpression(Long value) {
+  public static Expression getLiteralExpression(String value) {
     Expression expression = createNewLiteralExpression();
-    expression.getLiteral().setLongValue(value);
+    expression.getLiteral().setStringValue(value);
     return expression;
   }
 
-  public static Expression getLiteralExpression(Float value) {
-    return getLiteralExpression(value.doubleValue());
-  }
-
-  public static Expression getLiteralExpression(Double value) {
+  public static Expression getLiteralExpression(byte[] value) {
     Expression expression = createNewLiteralExpression();
-    expression.getLiteral().setDoubleValue(value);
+    expression.getLiteral().setStringValue(BytesUtils.toHexString(value));
     return expression;
   }
 
   public static Expression getLiteralExpression(Object object) {
-    if (object instanceof Integer) {
-      return RequestUtils.getLiteralExpression((Integer) object);
-    }
-    if (object instanceof Long) {
-      return RequestUtils.getLiteralExpression((Long) object);
-    }
-    if (object instanceof Float) {
-      return RequestUtils.getLiteralExpression((Float) object);
-    }
-    if (object instanceof Double) {
-      return RequestUtils.getLiteralExpression((Double) object);
-    }
-    if (object instanceof String) {
-      return RequestUtils.getLiteralExpression((String) object);
+    if (object instanceof Integer || object instanceof Long) {
+      return RequestUtils.getLiteralExpression(((Number) object).longValue());
     }
-    if (object instanceof SqlLiteral) {
-      return RequestUtils.getLiteralExpression((SqlLiteral) object);
+    if (object instanceof Float || object instanceof Double) {
+      return RequestUtils.getLiteralExpression(((Number) 
object).doubleValue());
     }
     if (object instanceof byte[]) {
       return RequestUtils.getLiteralExpression((byte[]) object);
     }
-    throw new SqlCompilationException(
-        new IllegalArgumentException("Unsupported Literal value type - " + 
object.getClass()));
+    return RequestUtils.getLiteralExpression(object.toString());
   }
 
   public static Expression getFunctionExpression(String operator) {
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
index 1c6d241..6960366 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
@@ -94,11 +94,10 @@ public class PinotDataTypeTest {
 
   @Test
   public void testObject() {
-    assertEquals(OBJECT.toInteger(new NumberObject("123")).intValue(), 123);
-    assertEquals(OBJECT.toLong(new NumberObject("123")).intValue(), 123);
-    assertEquals(OBJECT.toFloat(new NumberObject("123")).intValue(), 123);
-    assertEquals(OBJECT.toDouble(new NumberObject("123")).intValue(), 123);
-    assertEquals(OBJECT.toInteger(new NumberObject("123")).intValue(), 123);
+    assertEquals(OBJECT.toInt(new NumberObject("123")), 123);
+    assertEquals(OBJECT.toLong(new NumberObject("123")), 123L);
+    assertEquals(OBJECT.toFloat(new NumberObject("123")), 123f);
+    assertEquals(OBJECT.toDouble(new NumberObject("123")), 123.0);
     assertEquals(OBJECT.toString(new NumberObject("123")), "123");
     assertEquals(OBJECT_ARRAY.getSingleValueType(), OBJECT);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
index ee2f5bc..be4b1b6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
@@ -71,7 +71,7 @@ public class DataTypeTransformer implements RecordTransformer 
{
   public DataTypeTransformer(Schema schema) {
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       if (!fieldSpec.isVirtualColumn()) {
-        _dataTypes.put(fieldSpec.getName(), 
PinotDataType.getPinotDataType(fieldSpec));
+        _dataTypes.put(fieldSpec.getName(), 
PinotDataType.getPinotDataTypeForIngestion(fieldSpec));
       }
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
index b1ffac3..a0bbb90 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
@@ -31,11 +31,10 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nonnull;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -54,13 +53,12 @@ import org.apache.pinot.spi.utils.JsonUtils;
  *
  */
 public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
-
   public static final String FUNCTION_NAME = "jsonExtractScalar";
   private static final ParseContext JSON_PARSER_CONTEXT =
       
JsonPath.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
+
   private TransformFunction _jsonFieldTransformFunction;
   private String _jsonPath;
-  private String _resultsType;
   private Object _defaultValue = null;
   private TransformResultMetadata _resultMetadata;
 
@@ -70,7 +68,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public void init(@Nonnull List<TransformFunction> arguments, @Nonnull 
Map<String, DataSource> dataSourceMap) {
+  public void init(List<TransformFunction> arguments, Map<String, DataSource> 
dataSourceMap) {
     // Check that there are exactly 3 or 4 arguments
     if (arguments.size() < 3 || arguments.size() > 4) {
       throw new IllegalArgumentException(
@@ -84,41 +82,19 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
     }
     _jsonFieldTransformFunction = firstArgument;
     _jsonPath = ((LiteralTransformFunction) arguments.get(1)).getLiteral();
-    _resultsType = ((LiteralTransformFunction) 
arguments.get(2)).getLiteral().toUpperCase();
-    boolean isSingleValue = !_resultsType.toUpperCase().endsWith("_ARRAY");
+    String resultsType = ((LiteralTransformFunction) 
arguments.get(2)).getLiteral().toUpperCase();
+    boolean isSingleValue = !resultsType.endsWith("_ARRAY");
     try {
-      FieldSpec.DataType fieldType = 
FieldSpec.DataType.valueOf(_resultsType.split("_ARRAY")[0]);
-
+      DataType dataType =
+          DataType.valueOf(isSingleValue ? resultsType : 
resultsType.substring(0, resultsType.length() - 6));
       if (arguments.size() == 4) {
-        String defaultValue = ((LiteralTransformFunction) 
arguments.get(3)).getLiteral();
-        switch (fieldType) {
-          case INT:
-            _defaultValue = Double.valueOf(defaultValue).intValue();
-            break;
-          case LONG:
-            _defaultValue = Double.valueOf(defaultValue).longValue();
-            break;
-          case FLOAT:
-            _defaultValue = Double.valueOf(defaultValue).floatValue();
-            break;
-          case DOUBLE:
-            _defaultValue = Double.valueOf(defaultValue);
-            break;
-          case BOOLEAN:
-          case STRING:
-            _defaultValue = defaultValue;
-            break;
-          case BYTES:
-            throw new UnsupportedOperationException(String.format(
-                "Unsupported results type: BYTES for 'jsonExtractScalar' Udf. 
Supported types are: 
INT/LONG/FLOAT/DOUBLE/STRING/INT_ARRAY/LONG/FLOAT_ARRAY/DOUBLE_ARRAY/STRING_ARRAY",
-                _resultsType));
-        }
+        _defaultValue = dataType.convert(((LiteralTransformFunction) 
arguments.get(3)).getLiteral());
       }
-      _resultMetadata = new TransformResultMetadata(fieldType, isSingleValue, 
false);
+      _resultMetadata = new TransformResultMetadata(dataType, isSingleValue, 
false);
     } catch (Exception e) {
-      throw new UnsupportedOperationException(String.format(
-          "Unsupported results type: %s for 'jsonExtractScalar' Udf. Supported 
types are: 
INT/LONG/FLOAT/DOUBLE/STRING/INT_ARRAY/LONG/FLOAT_ARRAY/DOUBLE_ARRAY/STRING_ARRAY",
-          _resultsType));
+      throw new IllegalStateException(String.format(
+          "Unsupported results type: %s for jsonExtractScalar function. 
Supported types are: 
INT/LONG/FLOAT/DOUBLE/STRING/INT_ARRAY/LONG_ARRAY/FLOAT_ARRAY/DOUBLE_ARRAY/STRING_ARRAY",
+          resultsType));
     }
   }
 
@@ -128,7 +104,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public int[] transformToIntValuesSV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
     final String[] stringValuesSV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final int[] results = new int[projectionBlock.getNumDocs()];
     for (int i = 0; i < results.length; i++) {
@@ -151,7 +127,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public long[] transformToLongValuesSV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
     final String[] stringValuesSV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final long[] results = new long[projectionBlock.getNumDocs()];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -175,7 +151,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public float[] transformToFloatValuesSV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
     final String[] stringValuesSV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final float[] results = new float[projectionBlock.getNumDocs()];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -198,7 +174,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public double[] transformToDoubleValuesSV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
     final String[] stringValuesSV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final double[] results = new double[projectionBlock.getNumDocs()];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -223,7 +199,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public String[] transformToStringValuesSV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
     final String[] stringValuesSV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final String[] results = new String[projectionBlock.getNumDocs()];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -246,7 +222,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public int[][] transformToIntValuesMV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
     final String[] stringValuesMV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final int[][] results = new int[projectionBlock.getNumDocs()][];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -264,7 +240,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public long[][] transformToLongValuesMV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
     final String[] stringValuesMV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final long[][] results = new long[projectionBlock.getNumDocs()][];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -282,7 +258,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public float[][] transformToFloatValuesMV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
     final String[] stringValuesMV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final float[][] results = new float[projectionBlock.getNumDocs()][];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -300,7 +276,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public double[][] transformToDoubleValuesMV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) 
{
     final String[] stringValuesMV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final double[][] results = new double[projectionBlock.getNumDocs()][];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -318,7 +294,7 @@ public class JsonExtractScalarTransformFunction extends 
BaseTransformFunction {
   }
 
   @Override
-  public String[][] transformToStringValuesMV(@Nonnull ProjectionBlock 
projectionBlock) {
+  public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) 
{
     final String[] stringValuesMV = 
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
     final String[][] results = new String[projectionBlock.getNumDocs()][];
     for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
index eca96f6..d8d6151 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
@@ -39,13 +39,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 public class ScalarTransformFunctionWrapper extends BaseTransformFunction {
   private final String _name;
   private final FunctionInvoker _functionInvoker;
+  private final PinotDataType _resultType;
+  private final TransformResultMetadata _resultMetadata;
 
   private Object[] _arguments;
   private int _numNonLiteralArguments;
   private int[] _nonLiteralIndices;
   private TransformFunction[] _nonLiteralFunctions;
   private Object[][] _nonLiteralValues;
-  private TransformResultMetadata _resultMetadata;
 
   private int[] _intResults;
   private float[] _floatResults;
@@ -70,6 +71,17 @@ public class ScalarTransformFunctionWrapper extends 
BaseTransformFunction {
       Preconditions.checkArgument(parameterTypes[i] != null, "Unsupported 
parameter class: %s for method: %s",
           parameterClasses[i], functionInfo.getMethod());
     }
+    Class<?> resultClass = _functionInvoker.getResultClass();
+    PinotDataType resultType = FunctionUtils.getParameterType(resultClass);
+    if (resultType != null) {
+      _resultType = resultType;
+      _resultMetadata =
+          new TransformResultMetadata(FunctionUtils.getDataType(resultClass), 
_resultType.isSingleValue(), false);
+    } else {
+      // Handle unrecognized result class with STRING
+      _resultType = PinotDataType.STRING;
+      _resultMetadata = new TransformResultMetadata(DataType.STRING, true, 
false);
+    }
   }
 
   @Override
@@ -100,15 +112,6 @@ public class ScalarTransformFunctionWrapper extends 
BaseTransformFunction {
       }
     }
     _nonLiteralValues = new Object[_numNonLiteralArguments][];
-
-    Class<?> resultClass = _functionInvoker.getResultClass();
-    DataType resultDataType = FunctionUtils.getDataType(resultClass);
-    // Handle unrecognized result class with STRING
-    if (resultDataType == null) {
-      resultDataType = DataType.STRING;
-    }
-    boolean isSingleValue = !resultClass.isArray();
-    _resultMetadata = new TransformResultMetadata(resultDataType, 
isSingleValue, false);
   }
 
   @Override
@@ -331,9 +334,9 @@ public class ScalarTransformFunctionWrapper extends 
BaseTransformFunction {
   private void getNonLiteralValues(ProjectionBlock projectionBlock) {
     PinotDataType[] parameterTypes = _functionInvoker.getParameterTypes();
     for (int i = 0; i < _numNonLiteralArguments; i++) {
-      int index = _nonLiteralIndices[i];
+      PinotDataType parameterType = parameterTypes[_nonLiteralIndices[i]];
       TransformFunction transformFunction = _nonLiteralFunctions[i];
-      switch (parameterTypes[index]) {
+      switch (parameterType) {
         case INTEGER:
           _nonLiteralValues[i] = 
ArrayUtils.toObject(transformFunction.transformToIntValuesSV(projectionBlock));
           break;
@@ -352,23 +355,23 @@ public class ScalarTransformFunctionWrapper extends 
BaseTransformFunction {
         case BYTES:
           _nonLiteralValues[i] = 
transformFunction.transformToBytesValuesSV(projectionBlock);
           break;
-        case INTEGER_ARRAY:
+        case PRIMITIVE_INT_ARRAY:
           _nonLiteralValues[i] = 
transformFunction.transformToIntValuesMV(projectionBlock);
           break;
-        case LONG_ARRAY:
+        case PRIMITIVE_LONG_ARRAY:
           _nonLiteralValues[i] = 
transformFunction.transformToLongValuesMV(projectionBlock);
           break;
-        case FLOAT_ARRAY:
+        case PRIMITIVE_FLOAT_ARRAY:
           _nonLiteralValues[i] = 
transformFunction.transformToFloatValuesMV(projectionBlock);
           break;
-        case DOUBLE_ARRAY:
+        case PRIMITIVE_DOUBLE_ARRAY:
           _nonLiteralValues[i] = 
transformFunction.transformToDoubleValuesMV(projectionBlock);
           break;
         case STRING_ARRAY:
           _nonLiteralValues[i] = 
transformFunction.transformToStringValuesMV(projectionBlock);
           break;
         default:
-          throw new IllegalStateException();
+          throw new IllegalStateException("Unsupported parameter type: " + 
parameterType);
       }
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunction.java
index 0a046aa..085239a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunction.java
@@ -41,13 +41,19 @@ public class PostAggregationFunction {
     Preconditions
         .checkArgument(functionInfo != null, "Unsupported function: %s with %s 
parameters", functionName, numArguments);
     _functionInvoker = new FunctionInvoker(functionInfo);
+    Class<?>[] parameterClasses = _functionInvoker.getParameterClasses();
     PinotDataType[] parameterTypes = _functionInvoker.getParameterTypes();
-    Preconditions.checkArgument(numArguments == parameterTypes.length,
-        "Wrong number of arguments for method: %s, expected: %s, actual: %s", 
functionInfo.getMethod(),
-        parameterTypes.length, numArguments);
+    int numParameters = parameterClasses.length;
+    Preconditions.checkArgument(numArguments == numParameters,
+        "Wrong number of arguments for method: %s, expected: %s, actual: %s", 
functionInfo.getMethod(), numParameters,
+        numArguments);
+    for (int i = 0; i < numParameters; i++) {
+      Preconditions.checkArgument(parameterTypes[i] != null, "Unsupported 
parameter class: %s for method: %s",
+          parameterClasses[i], functionInfo.getMethod());
+    }
     _argumentTypes = new PinotDataType[numArguments];
     for (int i = 0; i < numArguments; i++) {
-      _argumentTypes[i] = PinotDataType.getPinotDataType(argumentTypes[i]);
+      _argumentTypes[i] = 
PinotDataType.getPinotDataTypeForExecution(argumentTypes[i]);
     }
     ColumnDataType resultType = 
FunctionUtils.getColumnDataType(_functionInvoker.getResultClass());
     // Handle unrecognized result class with STRING
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index 98d9624..d3a70c3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -35,7 +35,6 @@ import 
org.apache.pinot.core.query.request.context.predicate.RangePredicate;
 import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.BytesUtils;
 
 
 /**
@@ -241,25 +240,9 @@ public class ColumnValueSegmentPruner implements 
SegmentPruner {
 
   private static Comparable convertValue(String stringValue, DataType 
dataType) {
     try {
-      switch (dataType) {
-        case INT:
-          return Integer.valueOf(stringValue);
-        case LONG:
-          return Long.valueOf(stringValue);
-        case FLOAT:
-          return Float.valueOf(stringValue);
-        case DOUBLE:
-          return Double.valueOf(stringValue);
-        case STRING:
-          return stringValue;
-        case BYTES:
-          return BytesUtils.toByteArray(stringValue);
-        default:
-          throw new IllegalStateException();
-      }
+      return dataType.convertInternal(stringValue);
     } catch (Exception e) {
-      throw new BadQueryRequestException(String.format("Cannot convert value: 
'%s' to type: %s", stringValue, dataType),
-          e);
+      throw new BadQueryRequestException(e);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index ec59d49..47ed2d8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -103,14 +103,15 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
     }
     Serializable[] finalResults = new Serializable[numAggregationFunctions];
     for (int i = 0; i < numAggregationFunctions; i++) {
-      finalResults[i] = AggregationFunctionUtils
-          
.getSerializableValue(_aggregationFunctions[i].extractFinalResult(intermediateResults[i]));
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
+      finalResults[i] = aggregationFunction.getFinalResultColumnType()
+          
.convert(aggregationFunction.extractFinalResult(intermediateResults[i]));
     }
 
     if (_responseFormatSql) {
       brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
     } else {
-      
brokerResponseNative.setAggregationResults(reduceToAggregationResults(finalResults,
 dataSchema));
+      
brokerResponseNative.setAggregationResults(reduceToAggregationResults(finalResults,
 dataSchema.getColumnNames()));
     }
   }
 
@@ -120,26 +121,32 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
   private ResultTable reduceToResultTable(Object[] finalResults) {
     PostAggregationHandler postAggregationHandler =
         new PostAggregationHandler(_queryContext, 
getPrePostAggregationDataSchema());
-    DataSchema resultTableSchema = 
postAggregationHandler.getResultDataSchema();
-    Object[] resultRow = postAggregationHandler.getResult(finalResults);
-    return new ResultTable(resultTableSchema, 
Collections.singletonList(resultRow));
+    DataSchema dataSchema = postAggregationHandler.getResultDataSchema();
+    Object[] row = postAggregationHandler.getResult(finalResults);
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    for (int i = 0; i < numColumns; i++) {
+      row[i] = columnDataTypes[i].format(row[i]);
+    }
+    return new ResultTable(dataSchema, Collections.singletonList(row));
   }
 
   /**
    * Sets aggregation results into AggregationResults
    */
-  private List<AggregationResult> reduceToAggregationResults(Serializable[] 
finalResults, DataSchema dataSchema) {
+  private List<AggregationResult> reduceToAggregationResults(Serializable[] 
finalResults, String[] columnNames) {
     int numAggregationFunctions = _aggregationFunctions.length;
     List<AggregationResult> aggregationResults = new 
ArrayList<>(numAggregationFunctions);
     if (_preserveType) {
       for (int i = 0; i < numAggregationFunctions; i++) {
-        aggregationResults.add(new 
AggregationResult(dataSchema.getColumnName(i), finalResults[i]));
+        aggregationResults.add(new AggregationResult(columnNames[i],
+            
_aggregationFunctions[i].getFinalResultColumnType().format(finalResults[i])));
       }
     } else {
       // Format the values into strings
       for (int i = 0; i < numAggregationFunctions; i++) {
-        aggregationResults.add(
-            new AggregationResult(dataSchema.getColumnName(i), 
AggregationFunctionUtils.formatValue(finalResults[i])));
+        aggregationResults.add(new AggregationResult(columnNames[i], 
AggregationFunctionUtils
+            
.formatValue(_aggregationFunctions[i].getFinalResultColumnType().format(finalResults[i]))));
       }
     }
     return aggregationResults;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index ad3020a..97f9fa3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -30,12 +30,12 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.table.Record;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.core.util.QueryOptions;
 
@@ -86,8 +86,8 @@ public class DistinctDataTableReducer implements 
DataTableReducer {
         //       There's no way currently to get the data types of the 
distinct columns for empty results
 
         int numColumns = columns.length;
-        DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numColumns];
-        Arrays.fill(columnDataTypes, DataSchema.ColumnDataType.STRING);
+        ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
+        Arrays.fill(columnDataTypes, ColumnDataType.STRING);
         brokerResponseNative
             .setResultTable(new ResultTable(new DataSchema(columns, 
columnDataTypes), Collections.emptyList()));
       } else {
@@ -117,14 +117,14 @@ public class DistinctDataTableReducer implements 
DataTableReducer {
   private SelectionResults reduceToSelectionResult(DistinctTable 
distinctTable) {
     List<Serializable[]> rows = new ArrayList<>(distinctTable.size());
     DataSchema dataSchema = distinctTable.getDataSchema();
-    DataSchema.ColumnDataType[] columnDataTypes = 
dataSchema.getColumnDataTypes();
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
     Iterator<Record> iterator = distinctTable.getFinalResult();
     while (iterator.hasNext()) {
       Object[] values = iterator.next().getValues();
       Serializable[] row = new Serializable[numColumns];
       for (int i = 0; i < numColumns; i++) {
-        row[i] = SelectionOperatorUtils.convertValueToType(values[i], 
columnDataTypes[i]);
+        row[i] = columnDataTypes[i].convertAndFormat(values[i]);
       }
       rows.add(row);
     }
@@ -134,14 +134,14 @@ public class DistinctDataTableReducer implements 
DataTableReducer {
   private ResultTable reduceToResultTable(DistinctTable distinctTable) {
     List<Object[]> rows = new ArrayList<>(distinctTable.size());
     DataSchema dataSchema = distinctTable.getDataSchema();
-    DataSchema.ColumnDataType[] columnDataTypes = 
dataSchema.getColumnDataTypes();
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
     Iterator<Record> iterator = distinctTable.getFinalResult();
     while (iterator.hasNext()) {
       Object[] values = iterator.next().getValues();
       Object[] row = new Object[numColumns];
       for (int i = 0; i < numColumns; i++) {
-        row[i] = SelectionOperatorUtils.convertValueToType(values[i], 
columnDataTypes[i]);
+        row[i] = columnDataTypes[i].convertAndFormat(values[i]);
       }
       rows.add(row);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index f0b7c29..cbf03cf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -198,6 +198,8 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
     }
     Iterator<Record> sortedIterator = indexedTable.iterator();
     DataSchema prePostAggregationDataSchema = 
getPrePostAggregationDataSchema(dataSchema);
+    ColumnDataType[] columnDataTypes = 
prePostAggregationDataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
     int limit = _queryContext.getLimit();
     List<Object[]> rows = new ArrayList<>(limit);
 
@@ -206,13 +208,15 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
 
       PostAggregationHandler postAggregationHandler =
           new PostAggregationHandler(_queryContext, 
prePostAggregationDataSchema);
-      DataSchema resultTableSchema = 
postAggregationHandler.getResultDataSchema();
       FilterContext havingFilter = _queryContext.getHavingFilter();
       if (havingFilter != null) {
         HavingFilterHandler havingFilterHandler = new 
HavingFilterHandler(havingFilter, postAggregationHandler);
         while (rows.size() < limit && sortedIterator.hasNext()) {
           Object[] row = sortedIterator.next().getValues();
           extractFinalAggregationResults(row);
+          for (int i = 0; i < numColumns; i++) {
+            row[i] = columnDataTypes[i].convert(row[i]);
+          }
           if (havingFilterHandler.isMatch(row)) {
             rows.add(row);
           }
@@ -221,11 +225,25 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
         for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
           Object[] row = sortedIterator.next().getValues();
           extractFinalAggregationResults(row);
+          for (int j = 0; j < numColumns; j++) {
+            row[j] = columnDataTypes[j].convert(row[j]);
+          }
           rows.add(row);
         }
       }
-      rows.replaceAll(postAggregationHandler::getResult);
-      brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, 
rows));
+      DataSchema resultDataSchema = 
postAggregationHandler.getResultDataSchema();
+      ColumnDataType[] resultColumnDataTypes = 
resultDataSchema.getColumnDataTypes();
+      int numResultColumns = resultColumnDataTypes.length;
+      int numResultRows = rows.size();
+      List<Object[]> resultRows = new ArrayList<>(numResultRows);
+      for (Object[] row : rows) {
+        Object[] resultRow = postAggregationHandler.getResult(row);
+        for (int i = 0; i < numResultColumns; i++) {
+          resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+        }
+        resultRows.add(resultRow);
+      }
+      brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, 
resultRows));
     } else {
       // PQL query with SQL group-by mode and response format
       // NOTE: For PQL query, keep the order of columns as is (group-by 
expressions followed by aggregations), no need
@@ -234,6 +252,9 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
       for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
         Object[] row = sortedIterator.next().getValues();
         extractFinalAggregationResults(row);
+        for (int j = 0; j < numColumns; j++) {
+          row[j] = columnDataTypes[j].convertAndFormat(row[j]);
+        }
         rows.add(row);
       }
       brokerResponseNative.setResultTable(new 
ResultTable(prePostAggregationDataSchema, rows));
@@ -246,8 +267,7 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
   private void extractFinalAggregationResults(Object[] row) {
     for (int i = 0; i < _numAggregationFunctions; i++) {
       int valueIndex = i + _numGroupByExpressions;
-      row[valueIndex] =
-          
AggregationFunctionUtils.getSerializableValue(_aggregationFunctions[i].extractFinalResult(row[valueIndex]));
+      row[valueIndex] = 
_aggregationFunctions[i].extractFinalResult(row[valueIndex]);
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java
index 08deed3..f56162d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java
@@ -24,7 +24,6 @@ import 
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvide
 import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
@@ -133,28 +132,7 @@ public class HavingFilterHandler {
 
     PredicateRowMatcher(Predicate predicate) {
       _valueExtractor = 
_postAggregationHandler.getValueExtractor(predicate.getLhs());
-      switch (_valueExtractor.getColumnDataType()) {
-        case INT:
-          _valueType = DataType.INT;
-          break;
-        case LONG:
-          _valueType = DataType.LONG;
-          break;
-        case FLOAT:
-          _valueType = DataType.FLOAT;
-          break;
-        case DOUBLE:
-          _valueType = DataType.DOUBLE;
-          break;
-        case STRING:
-          _valueType = DataType.STRING;
-          break;
-        case BYTES:
-          _valueType = DataType.BYTES;
-          break;
-        default:
-          throw new IllegalStateException();
-      }
+      _valueType = _valueExtractor.getColumnDataType().toDataType();
       _predicateEvaluator = 
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, null, _valueType);
     }
 
@@ -173,7 +151,7 @@ public class HavingFilterHandler {
         case STRING:
           return _predicateEvaluator.applySV((String) value);
         case BYTES:
-          return _predicateEvaluator.applySV(((ByteArray) value).getBytes());
+          return _predicateEvaluator.applySV((byte[]) value);
         default:
           throw new IllegalStateException();
       }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index aec9d8f..9f3a5c9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -28,6 +28,7 @@ import java.util.PriorityQueue;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -90,11 +91,13 @@ public class SelectionOperatorService {
    * @return flexible {@link Comparator} for selection rows.
    */
   private Comparator<Object[]> 
getTypeCompatibleComparator(List<OrderByExpressionContext> orderByExpressions) {
+    ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
+
     // Compare all single-value columns
     int numOrderByExpressions = orderByExpressions.size();
     List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
     for (int i = 0; i < numOrderByExpressions; i++) {
-      if (!_dataSchema.getColumnDataType(i).isArray()) {
+      if (!columnDataTypes[i].isArray()) {
         valueIndexList.add(i);
       }
     }
@@ -107,7 +110,7 @@ public class SelectionOperatorService {
     for (int i = 0; i < numValuesToCompare; i++) {
       int valueIndex = valueIndexList.get(i);
       valueIndices[i] = valueIndex;
-      isNumber[i] = _dataSchema.getColumnDataType(valueIndex).isNumber();
+      isNumber[i] = columnDataTypes[valueIndex].isNumber();
       multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
     }
 
@@ -166,7 +169,7 @@ public class SelectionOperatorService {
     LinkedList<Serializable[]> rowsInSelectionResults = new LinkedList<>();
     int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
     int numColumns = columnIndices.length;
-    DataSchema.ColumnDataType[] columnDataTypes = 
_dataSchema.getColumnDataTypes();
+    ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
 
     if (preserveType) {
       while (_rows.size() > _offset) {
@@ -175,7 +178,7 @@ public class SelectionOperatorService {
         Serializable[] extractedRow = new Serializable[numColumns];
         for (int i = 0; i < numColumns; i++) {
           int columnIndex = columnIndices[i];
-          extractedRow[i] = 
SelectionOperatorUtils.convertValueToType(row[columnIndex], 
columnDataTypes[columnIndex]);
+          extractedRow[i] = 
columnDataTypes[columnIndex].convertAndFormat(row[columnIndex]);
         }
         rowsInSelectionResults.addFirst(extractedRow);
       }
@@ -204,32 +207,33 @@ public class SelectionOperatorService {
    * @return {@link SelectionResults} object results.
    */
   public ResultTable renderResultTableWithOrdering() {
-    LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
     int[] columnIndices = 
SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
     int numColumns = columnIndices.length;
-    DataSchema.ColumnDataType[] columnDataTypes = 
_dataSchema.getColumnDataTypes();
-
-    while (_rows.size() > _offset) {
-      Object[] row = _rows.poll();
-      assert row != null;
-      Object[] extractedRow = new Object[numColumns];
-      for (int i = 0; i < numColumns; i++) {
-        int columnIndex = columnIndices[i];
-        extractedRow[i] = 
SelectionOperatorUtils.convertValueToType(row[columnIndex], 
columnDataTypes[columnIndex]);
-      }
-      rowsInSelectionResults.addFirst(extractedRow);
-    }
 
     // Construct the result data schema
     String[] columnNames = _dataSchema.getColumnNames();
+    ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
     String[] resultColumnNames = new String[numColumns];
-    DataSchema.ColumnDataType[] resultColumnDataTypes = new 
DataSchema.ColumnDataType[numColumns];
+    ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
     for (int i = 0; i < numColumns; i++) {
       int columnIndex = columnIndices[i];
       resultColumnNames[i] = columnNames[columnIndex];
       resultColumnDataTypes[i] = columnDataTypes[columnIndex];
     }
     DataSchema resultDataSchema = new DataSchema(resultColumnNames, 
resultColumnDataTypes);
+
+    // Extract the result rows
+    LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
+    while (_rows.size() > _offset) {
+      Object[] row = _rows.poll();
+      assert row != null;
+      Object[] extractedRow = new Object[numColumns];
+      for (int i = 0; i < numColumns; i++) {
+        extractedRow[i] = 
resultColumnDataTypes[i].convertAndFormat(row[columnIndices[i]]);
+      }
+      rowsInSelectionResults.addFirst(extractedRow);
+    }
+
     return new ResultTable(resultDataSchema, rowsInSelectionResults);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index f589806..ca73042 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.indexsegment.IndexSegment;
@@ -177,8 +178,8 @@ public class SelectionOperatorUtils {
    */
   public static DataSchema getResultTableDataSchema(DataSchema dataSchema, 
List<String> selectionColumns) {
     int numColumns = selectionColumns.size();
-    Map<String, DataSchema.ColumnDataType> columnNameToDataType = new 
HashMap<>();
-    DataSchema.ColumnDataType[] finalColumnDataTypes = new 
DataSchema.ColumnDataType[numColumns];
+    Map<String, ColumnDataType> columnNameToDataType = new HashMap<>();
+    ColumnDataType[] finalColumnDataTypes = new ColumnDataType[numColumns];
     for (int i = 0; i < dataSchema.size(); i++) {
       columnNameToDataType.put(dataSchema.getColumnName(i), 
dataSchema.getColumnDataType(i));
     }
@@ -242,7 +243,7 @@ public class SelectionOperatorUtils {
       dataTableBuilder.startRow();
       for (int i = 0; i < numColumns; i++) {
         Object columnValue = row[i];
-        DataSchema.ColumnDataType columnDataType = 
dataSchema.getColumnDataType(i);
+        ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
         switch (columnDataType) {
           // Single-value column
           case INT:
@@ -335,7 +336,7 @@ public class SelectionOperatorUtils {
 
     Object[] row = new Object[numColumns];
     for (int i = 0; i < numColumns; i++) {
-      DataSchema.ColumnDataType columnDataType = 
dataSchema.getColumnDataType(i);
+      ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
       switch (columnDataType) {
         // Single-value column
         case INT:
@@ -417,13 +418,13 @@ public class SelectionOperatorUtils {
       List<String> selectionColumns, boolean preserveType) {
     int numRows = rows.size();
     List<Serializable[]> resultRows = new ArrayList<>(numRows);
-    DataSchema.ColumnDataType[] columnDataTypes = 
dataSchema.getColumnDataTypes();
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
     if (preserveType) {
       for (Object[] row : rows) {
         Serializable[] resultRow = new Serializable[numColumns];
         for (int i = 0; i < numColumns; i++) {
-          resultRow[i] = convertValueToType(row[i], columnDataTypes[i]);
+          resultRow[i] = columnDataTypes[i].convertAndFormat(row[i]);
         }
         resultRows.add(resultRow);
       }
@@ -452,12 +453,12 @@ public class SelectionOperatorUtils {
   public static ResultTable renderResultTableWithoutOrdering(List<Object[]> 
rows, DataSchema dataSchema) {
     int numRows = rows.size();
     List<Object[]> resultRows = new ArrayList<>(numRows);
-    DataSchema.ColumnDataType[] columnDataTypes = 
dataSchema.getColumnDataTypes();
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
     for (Object[] row : rows) {
       Object[] resultRow = new Object[numColumns];
       for (int i = 0; i < numColumns; i++) {
-        resultRow[i] = convertValueToType(row[i], columnDataTypes[i]);
+        resultRow[i] = columnDataTypes[i].convertAndFormat(row[i]);
       }
       resultRows.add(resultRow);
     }
@@ -491,81 +492,13 @@ public class SelectionOperatorUtils {
   }
 
   /**
-   * Converts a value into the given data type. (Broker side)
-   * <p>Actual value type can be different with data type passed in, but they 
must be type compatible.
-   */
-  public static Serializable convertValueToType(Object value, 
DataSchema.ColumnDataType dataType) {
-    switch (dataType) {
-      // Single-value column
-      case INT:
-        return ((Number) value).intValue();
-      case LONG:
-        return ((Number) value).longValue();
-      case FLOAT:
-        return ((Number) value).floatValue();
-      case DOUBLE:
-        return ((Number) value).doubleValue();
-      // NOTE: Return hex-encoded String for BYTES columns for 
backward-compatibility
-      // TODO: Revisit to see whether we should return byte[] instead
-      case BYTES:
-        return ((ByteArray) value).toHexString();
-
-      // Multi-value column
-      case LONG_ARRAY:
-        // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
-        if (value instanceof int[]) {
-          int[] ints = (int[]) value;
-          int length = ints.length;
-          long[] longs = new long[length];
-          for (int i = 0; i < length; i++) {
-            longs[i] = ints[i];
-          }
-          return longs;
-        } else {
-          return (long[]) value;
-        }
-      case DOUBLE_ARRAY:
-        // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and 
DOUBLE_ARRAY
-        if (value instanceof int[]) {
-          int[] ints = (int[]) value;
-          int length = ints.length;
-          double[] doubles = new double[length];
-          for (int i = 0; i < length; i++) {
-            doubles[i] = ints[i];
-          }
-          return doubles;
-        } else if (value instanceof long[]) {
-          long[] longs = (long[]) value;
-          int length = longs.length;
-          double[] doubles = new double[length];
-          for (int i = 0; i < length; i++) {
-            doubles[i] = longs[i];
-          }
-          return doubles;
-        } else if (value instanceof float[]) {
-          float[] floats = (float[]) value;
-          int length = floats.length;
-          double[] doubles = new double[length];
-          for (int i = 0; i < length; i++) {
-            doubles[i] = floats[i];
-          }
-          return doubles;
-        } else {
-          return (double[]) value;
-        }
-
-      default:
-        // For STRING, INT_ARRAY, FLOAT_ARRAY and STRING_ARRAY, no need to 
format
-        return (Serializable) value;
-    }
-  }
-
-  /**
+   * Deprecated because this method is only used to construct the PQL 
response, and PQL is already deprecated.
    * Formats a value into a {@code String} (single-value column) or {@code 
String[]} (multi-value column) based on the
    * data type. (Broker side)
    * <p>Actual value type can be different with data type passed in, but they 
must be type compatible.
    */
-  public static Serializable getFormattedValue(Object value, 
DataSchema.ColumnDataType dataType) {
+  @Deprecated
+  public static Serializable getFormattedValue(Object value, ColumnDataType 
dataType) {
     switch (dataType) {
       // Single-value column
       case INT:
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java
index 9870242..559b201 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java
@@ -22,7 +22,6 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.spi.utils.ByteArray;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertFalse;
@@ -84,20 +83,13 @@ public class HavingFilterHandlerTest {
       PostAggregationHandler postAggregationHandler = new 
PostAggregationHandler(queryContext, dataSchema);
       HavingFilterHandler havingFilterHandler =
           new HavingFilterHandler(queryContext.getHavingFilter(), 
postAggregationHandler);
-      assertTrue(
-          havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "11", 
new ByteArray(new byte[]{17}), 5}));
-      assertFalse(
-          havingFilterHandler.isMatch(new Object[]{10, 11L, 10.5f, 10.5, "11", 
new ByteArray(new byte[]{17}), 5}));
-      assertFalse(
-          havingFilterHandler.isMatch(new Object[]{11, 10L, 10.5f, 10.5, "11", 
new ByteArray(new byte[]{17}), 5}));
-      assertFalse(
-          havingFilterHandler.isMatch(new Object[]{11, 11L, 10.0f, 10.5, "11", 
new ByteArray(new byte[]{17}), 5}));
-      assertFalse(
-          havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.0, "11", 
new ByteArray(new byte[]{17}), 5}));
-      assertFalse(
-          havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "10", 
new ByteArray(new byte[]{17}), 5}));
-      assertFalse(
-          havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "11", 
new ByteArray(new byte[]{16}), 5}));
+      assertTrue(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 
10.5, "11", new byte[]{17}, 5}));
+      assertFalse(havingFilterHandler.isMatch(new Object[]{10, 11L, 10.5f, 
10.5, "11", new byte[]{17}, 5}));
+      assertFalse(havingFilterHandler.isMatch(new Object[]{11, 10L, 10.5f, 
10.5, "11", new byte[]{17}, 5}));
+      assertFalse(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.0f, 
10.5, "11", new byte[]{17}, 5}));
+      assertFalse(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 
10.0, "11", new byte[]{17}, 5}));
+      assertFalse(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 
10.5, "10", new byte[]{17}, 5}));
+      assertFalse(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 
10.5, "11", new byte[]{16}, 5}));
     }
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
index 3011f70..9a0757b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
@@ -31,12 +31,16 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.geospatial.GeometryUtils;
 import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.geospatial.transform.function.ScalarFunctions;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -52,7 +56,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.geom.Geometry;
@@ -70,6 +74,7 @@ import static org.testng.AssertJUnit.assertNotNull;
 /**
  * Queries test for ST_UNION queries.
  */
+@SuppressWarnings("rawtypes")
 public class StUnionQueriesTest extends BaseQueriesTest {
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"StUnionQueriesTest");
   private static final String RAW_TABLE_NAME = "testTable";
@@ -158,22 +163,50 @@ public class StUnionQueriesTest extends BaseQueriesTest {
     
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
 NUM_RECORDS, 0, NUM_RECORDS,
         NUM_RECORDS);
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
-
     assertNotNull(aggregationResult);
-
+    assertEquals(aggregationResult.size(), 1);
     assertEquals(aggregationResult.get(0), _intermediateResult);
 
     // Inter segments
     String[] expectedResults = new String[1];
-    expectedResults[0] = new ByteArray(_expectedResults).toHexString();
+    expectedResults[0] = BytesUtils.toHexString(_expectedResults);
     BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
     QueriesTestUtils
         .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 
4 * NUM_RECORDS, 4 * NUM_RECORDS,
             expectedResults);
-    brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
-    QueriesTestUtils
-        .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 
4 * NUM_RECORDS, 4 * NUM_RECORDS,
-            expectedResults);
+  }
+
+  @Test
+  public void testPostAggregation() {
+    String query =
+        "SELECT ST_AS_TEXT(ST_UNION(pointColumn)), 
TO_GEOMETRY(ST_UNION(pointColumn)), 
TO_SPHERICAL_GEOGRAPHY(ST_UNION(pointColumn)), 
ST_AS_TEXT(TO_SPHERICAL_GEOGRAPHY(ST_UNION(pointColumn))) FROM testTable";
+
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationOperator) 
operator).nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
 NUM_RECORDS, 0, NUM_RECORDS,
+        NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getAggregationResult();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 4);
+    for (Object value : aggregationResult) {
+      assertEquals(value, _intermediateResult);
+    }
+
+    // Inter segment
+    BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    DataSchema expectedDataSchema = new DataSchema(
+        new String[]{"st_as_text(st_union(pointColumn))", 
"to_geometry(st_union(pointColumn))", 
"to_spherical_geography(st_union(pointColumn))", 
"st_as_text(to_spherical_geography(st_union(pointColumn)))"},
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, 
ColumnDataType.BYTES, ColumnDataType.STRING});
+    assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 1);
+    assertEquals(rows.get(0), new 
Object[]{ScalarFunctions.stAsText(_expectedResults), BytesUtils.toHexString(
+        ScalarFunctions.toGeometry(_expectedResults)), BytesUtils.toHexString(
+        ScalarFunctions.toSphericalGeography(_expectedResults)), 
ScalarFunctions.stAsText(
+        ScalarFunctions.toSphericalGeography(_expectedResults))});
   }
 
   @Test
@@ -186,18 +219,15 @@ public class StUnionQueriesTest extends BaseQueriesTest {
     IntermediateResultsBlock resultsBlock = ((AggregationOperator) 
operator).nextBlock();
     
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
 0, 0, 0, NUM_RECORDS);
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
-
     assertNotNull(aggregationResult);
-
+    assertEquals(aggregationResult.size(), 1);
     assertEquals(aggregationResult.get(0), GeometryUtils.EMPTY_POINT);
 
     // Inter segments
     String[] expectedResults = new String[1];
-    expectedResults[0] = new 
ByteArray(GeometrySerializer.serialize(GeometryUtils.EMPTY_POINT)).toHexString();
+    expectedResults[0] = 
BytesUtils.toHexString(GeometrySerializer.serialize(GeometryUtils.EMPTY_POINT));
     BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
     QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0, 
0, 4 * NUM_RECORDS, expectedResults);
-    brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
-    QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0, 
0, 4 * NUM_RECORDS, expectedResults);
   }
 
   @Test
@@ -244,8 +274,7 @@ public class StUnionQueriesTest extends BaseQueriesTest {
         assertEquals(group.size(), 1);
         int key = Integer.parseInt(group.get(0));
         assertTrue(_values.containsKey(key));
-        assertEquals(groupByResult.getValue(),
-            new 
ByteArray(GeometrySerializer.serialize(_values.get(key))).toHexString());
+        assertEquals(groupByResult.getValue(), 
BytesUtils.toHexString(GeometrySerializer.serialize(_values.get(key))));
       }
     }
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SumPrecisionQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SumPrecisionQueriesTest.java
index 7590d3f..d3ba43a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SumPrecisionQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SumPrecisionQueriesTest.java
@@ -29,7 +29,10 @@ import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
@@ -166,12 +169,17 @@ public class SumPrecisionQueriesTest extends 
BaseQueriesTest {
 
     // Inter segment
     BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
-    List<Object[]> rows = brokerResponse.getResultTable().getRows();
+    ResultTable resultTable = brokerResponse.getResultTable();
+    DataSchema expectedDataSchema = new DataSchema(
+        new String[]{"sumprecision(intColumn)", "sumprecision(longColumn)", 
"sumprecision(floatColumn)", "sumprecision(doubleColumn)", 
"sumprecision(stringColumn)", "sumprecision(bytesColumn)"},
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING, 
ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING, 
ColumnDataType.STRING});
+    assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+    List<Object[]> rows = resultTable.getRows();
     assertEquals(rows.size(), 1);
-    BigDecimal intSum = _intSum.multiply(FOUR);
-    BigDecimal longSum = _longSum.multiply(FOUR);
-    BigDecimal floatSum = _floatSum.multiply(FOUR);
-    BigDecimal doubleSum = _doubleSum.multiply(FOUR);
+    String intSum = _intSum.multiply(FOUR).toString();
+    String longSum = _longSum.multiply(FOUR).toString();
+    String floatSum = _floatSum.multiply(FOUR).toString();
+    String doubleSum = _doubleSum.multiply(FOUR).toString();
     assertEquals(rows.get(0), new Object[]{intSum, longSum, floatSum, 
doubleSum, doubleSum, doubleSum});
   }
 
@@ -195,13 +203,18 @@ public class SumPrecisionQueriesTest extends 
BaseQueriesTest {
 
     // Inter segment
     BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
-    List<Object[]> rows = brokerResponse.getResultTable().getRows();
+    ResultTable resultTable = brokerResponse.getResultTable();
+    DataSchema expectedDataSchema = new DataSchema(
+        new String[]{"sumprecision(intColumn)", "sumprecision(longColumn)", 
"sumprecision(floatColumn)", "sumprecision(doubleColumn)", 
"sumprecision(stringColumn)", "sumprecision(bytesColumn)"},
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING, 
ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING, 
ColumnDataType.STRING});
+    assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+    List<Object[]> rows = resultTable.getRows();
     assertEquals(rows.size(), 1);
     MathContext mathContext = new MathContext(6, RoundingMode.HALF_EVEN);
-    BigDecimal intSum = _intSum.multiply(FOUR).round(mathContext);
-    BigDecimal longSum = _longSum.multiply(FOUR).round(mathContext);
-    BigDecimal floatSum = _floatSum.multiply(FOUR).round(mathContext);
-    BigDecimal doubleSum = _doubleSum.multiply(FOUR).round(mathContext);
+    String intSum = _intSum.multiply(FOUR).round(mathContext).toString();
+    String longSum = _longSum.multiply(FOUR).round(mathContext).toString();
+    String floatSum = _floatSum.multiply(FOUR).round(mathContext).toString();
+    String doubleSum = _doubleSum.multiply(FOUR).round(mathContext).toString();
     assertEquals(rows.get(0), new Object[]{intSum, longSum, floatSum, 
doubleSum, doubleSum, doubleSum});
   }
 
@@ -225,16 +238,45 @@ public class SumPrecisionQueriesTest extends 
BaseQueriesTest {
 
     // Inter segment
     BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
-    List<Object[]> rows = brokerResponse.getResultTable().getRows();
+    ResultTable resultTable = brokerResponse.getResultTable();
+    DataSchema expectedDataSchema = new DataSchema(
+        new String[]{"sumprecision(intColumn)", "sumprecision(longColumn)", 
"sumprecision(floatColumn)", "sumprecision(doubleColumn)", 
"sumprecision(stringColumn)", "sumprecision(bytesColumn)"},
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING, 
ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING, 
ColumnDataType.STRING});
+    assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+    List<Object[]> rows = resultTable.getRows();
     assertEquals(rows.size(), 1);
     MathContext mathContext = new MathContext(10, RoundingMode.HALF_EVEN);
-    BigDecimal intSum = _intSum.multiply(FOUR).round(mathContext).setScale(3, 
RoundingMode.HALF_EVEN);
-    BigDecimal longSum = 
_longSum.multiply(FOUR).round(mathContext).setScale(3, RoundingMode.HALF_EVEN);
-    BigDecimal floatSum = 
_floatSum.multiply(FOUR).round(mathContext).setScale(3, RoundingMode.HALF_EVEN);
-    BigDecimal doubleSum = 
_doubleSum.multiply(FOUR).round(mathContext).setScale(3, 
RoundingMode.HALF_EVEN);
+    String intSum = _intSum.multiply(FOUR).round(mathContext).setScale(3, 
RoundingMode.HALF_EVEN).toString();
+    String longSum = _longSum.multiply(FOUR).round(mathContext).setScale(3, 
RoundingMode.HALF_EVEN).toString();
+    String floatSum = _floatSum.multiply(FOUR).round(mathContext).setScale(3, 
RoundingMode.HALF_EVEN).toString();
+    String doubleSum = 
_doubleSum.multiply(FOUR).round(mathContext).setScale(3, 
RoundingMode.HALF_EVEN).toString();
     assertEquals(rows.get(0), new Object[]{intSum, longSum, floatSum, 
doubleSum, doubleSum, doubleSum});
   }
 
+  @Test
+  public void testPostAggregation() {
+    String query = "SELECT SUM_PRECISION(intColumn) * 2 FROM testTable";
+
+    // Inner segment
+    Operator operator = getOperatorForSqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    List<Object> aggregationResult = ((AggregationOperator) 
operator).nextBlock().getAggregationResult();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 1);
+    assertEquals(aggregationResult.get(0), _intSum);
+
+    // Inter segment
+    BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
+    ResultTable resultTable = brokerResponse.getResultTable();
+    DataSchema expectedDataSchema = new DataSchema(new 
String[]{"times(sum_precision(intColumn),'2')"},
+        new ColumnDataType[]{ColumnDataType.DOUBLE});
+    assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+    List<Object[]> rows = resultTable.getRows();
+    assertEquals(rows.size(), 1);
+    double expectedResult = _intSum.multiply(FOUR).doubleValue() * 2;
+    assertEquals(rows.get(0), new Object[]{expectedResult});
+  }
+
   @AfterClass
   public void tearDown()
       throws IOException {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index a393ca5..8afeeed 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -42,22 +42,19 @@ import org.apache.pinot.spi.utils.JsonUtils;
 public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable 
{
   private static final int DEFAULT_MAX_LENGTH = 512;
 
-  // TODO: revisit to see if we allow 0-length byte array
-  private static final byte[] NULL_BYTE_ARRAY_VALUE = new byte[0];
-
   public static final Integer DEFAULT_DIMENSION_NULL_VALUE_OF_INT = 
Integer.MIN_VALUE;
   public static final Long DEFAULT_DIMENSION_NULL_VALUE_OF_LONG = 
Long.MIN_VALUE;
   public static final Float DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT = 
Float.NEGATIVE_INFINITY;
   public static final Double DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE = 
Double.NEGATIVE_INFINITY;
   public static final String DEFAULT_DIMENSION_NULL_VALUE_OF_STRING = "null";
-  public static final byte[] DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES = 
NULL_BYTE_ARRAY_VALUE;
+  public static final byte[] DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES = new 
byte[0];
 
   public static final Integer DEFAULT_METRIC_NULL_VALUE_OF_INT = 0;
   public static final Long DEFAULT_METRIC_NULL_VALUE_OF_LONG = 0L;
   public static final Float DEFAULT_METRIC_NULL_VALUE_OF_FLOAT = 0.0F;
   public static final Double DEFAULT_METRIC_NULL_VALUE_OF_DOUBLE = 0.0D;
   public static final String DEFAULT_METRIC_NULL_VALUE_OF_STRING = "null";
-  public static final byte[] DEFAULT_METRIC_NULL_VALUE_OF_BYTES = 
NULL_BYTE_ARRAY_VALUE;
+  public static final byte[] DEFAULT_METRIC_NULL_VALUE_OF_BYTES = new byte[0];
 
   protected String _name;
   protected DataType _dataType;
@@ -207,8 +204,7 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
             case BYTES:
               return DEFAULT_METRIC_NULL_VALUE_OF_BYTES;
             default:
-              throw new UnsupportedOperationException(
-                  "Unknown default null value for metric field of data type: " 
+ dataType);
+              throw new IllegalStateException("Unsupported metric data type: " 
+ dataType);
           }
         case DIMENSION:
         case TIME:
@@ -227,11 +223,10 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
             case BYTES:
               return DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES;
             default:
-              throw new UnsupportedOperationException(
-                  "Unknown default null value for dimension/time field of data 
type: " + dataType);
+              throw new IllegalStateException("Unsupported dimension/time data 
type: " + dataType);
           }
         default:
-          throw new UnsupportedOperationException("Unsupported field type: " + 
fieldType);
+          throw new IllegalStateException("Unsupported field type: " + 
fieldType);
       }
     }
   }
@@ -276,11 +271,29 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
 
   protected void appendDefaultNullValue(ObjectNode jsonNode) {
     assert _defaultNullValue != null;
+    String key = "defaultNullValue";
     if (!_defaultNullValue.equals(getDefaultNullValue(getFieldType(), 
_dataType, null))) {
-      if (_defaultNullValue instanceof Number) {
-        jsonNode.set("defaultNullValue", 
JsonUtils.objectToJsonNode(_defaultNullValue));
-      } else {
-        jsonNode.put("defaultNullValue", getStringValue(_defaultNullValue));
+      switch (_dataType) {
+        case INT:
+          jsonNode.put(key, (Integer) _defaultNullValue);
+          break;
+        case LONG:
+          jsonNode.put(key, (Long) _defaultNullValue);
+          break;
+        case FLOAT:
+          jsonNode.put(key, (Float) _defaultNullValue);
+          break;
+        case DOUBLE:
+          jsonNode.put(key, (Double) _defaultNullValue);
+          break;
+        case STRING:
+          jsonNode.put(key, (String) _defaultNullValue);
+          break;
+        case BYTES:
+          jsonNode.put(key, BytesUtils.toHexString((byte[]) 
_defaultNullValue));
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + this);
       }
     }
   }
@@ -335,6 +348,7 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
   /**
    * The <code>DataType</code> enum is used to demonstrate the data type of a 
field.
    */
+  @SuppressWarnings("rawtypes")
   public enum DataType {
     // LIST is for complex lists which is different from multi-value column of 
primitives
     // STRUCT, MAP and LIST are composable to form a COMPLEX field
@@ -366,28 +380,6 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
     }
 
     /**
-     * Converts the given string value to the data type.
-     */
-    public Object convert(String value) {
-      switch (this) {
-        case INT:
-          return Integer.valueOf(value);
-        case LONG:
-          return Long.valueOf(value);
-        case FLOAT:
-          return Float.valueOf(value);
-        case DOUBLE:
-          return Double.valueOf(value);
-        case STRING:
-          return value;
-        case BYTES:
-          return BytesUtils.toBytes(value);
-        default:
-          throw new UnsupportedOperationException("Unsupported data type: " + 
this);
-      }
-    }
-
-    /**
      * Check if the data type is for fixed width data (INT, LONG, FLOAT, 
DOUBLE)
      * or variable width data (STRING, BYTES)
      */
@@ -398,6 +390,58 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, Serializable {
     public boolean isNumeric() {
       return this == INT || this == LONG || this == FLOAT || this == DOUBLE;
     }
+
+    /**
+     * Converts the given string value to the data type. Returns byte[] for 
BYTES.
+     */
+    public Object convert(String value) {
+      try {
+        switch (this) {
+          case INT:
+            return Integer.valueOf(value);
+          case LONG:
+            return Long.valueOf(value);
+          case FLOAT:
+            return Float.valueOf(value);
+          case DOUBLE:
+            return Double.valueOf(value);
+          case STRING:
+            return value;
+          case BYTES:
+            return BytesUtils.toBytes(value);
+          default:
+            throw new IllegalStateException();
+        }
+      } catch (Exception e) {
+        throw new IllegalArgumentException(String.format("Cannot convert 
value: '%s' to type: %s", value, this));
+      }
+    }
+
+    /**
+     * Converts the given string value to the data type. Returns ByteArray for 
BYTES.
+     */
+    public Comparable convertInternal(String value) {
+      try {
+        switch (this) {
+          case INT:
+            return Integer.valueOf(value);
+          case LONG:
+            return Long.valueOf(value);
+          case FLOAT:
+            return Float.valueOf(value);
+          case DOUBLE:
+            return Double.valueOf(value);
+          case STRING:
+            return value;
+          case BYTES:
+            return BytesUtils.toByteArray(value);
+          default:
+            throw new IllegalStateException();
+        }
+      } catch (Exception e) {
+        throw new IllegalArgumentException(String.format("Cannot convert 
value: '%s' to type: %s", value, this));
+      }
+    }
   }
 
   @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to