This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bb9b5f27f use single string dict and eliminate the integer key field 
(#8894)
0bb9b5f27f is described below

commit 0bb9b5f27f5e66807f835bf172c5a2bfe1334cb8
Author: Rong Rong <[email protected]>
AuthorDate: Sat Jun 18 17:47:19 2022 -0700

    use single string dict and eliminate the integer key field (#8894)
---
 .../connector/spark/connector/PinotUtilsTest.scala |   8 +-
 .../apache/pinot/core/common/NullBitmapUtils.java  |  49 ---
 .../pinot/core/common/datablock/BaseDataBlock.java |  78 ++---
 .../core/common/datablock/ColumnarDataBlock.java   |   7 +-
 .../core/common/datablock/DataBlockBuilder.java    |  90 +++---
 .../pinot/core/common/datablock/MetadataBlock.java |   3 +-
 .../pinot/core/common/datablock/RowDataBlock.java  |   7 +-
 .../common/datatable/BaseDataTableBuilder.java     | 162 ++++++++++
 .../core/common/datatable/DataTableBuilder.java    | 333 +++------------------
 .../common/datatable/DataTableBuilderV2V3.java     | 118 ++++++++
 .../core/common/datatable/DataTableBuilderV4.java  |  90 ++++++
 .../core/common/datatable/DataTableFactory.java    |  55 +++-
 .../core/common/datatable/DataTableImplV2.java     |   4 +-
 .../core/common/datatable/DataTableImplV3.java     |   4 +-
 .../core/common/datatable/DataTableImplV4.java     |   9 +-
 .../core/common/datatable/DataTableUtils.java      |  56 +---
 .../StreamingInstanceResponseOperator.java         |   4 +-
 .../operator/blocks/IntermediateResultsBlock.java  |   8 +-
 .../pinot/core/query/distinct/DistinctTable.java   |   3 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  |  11 +-
 .../pinot/core/query/scheduler/QueryScheduler.java |   6 +-
 .../query/selection/SelectionOperatorUtils.java    |   4 +-
 .../core/transport/InstanceRequestHandler.java     |  10 +-
 .../pinot/core/common/datablock/DataBlockTest.java |   4 +-
 .../core/common/datatable/DataTableSerDeTest.java  |  44 +--
 .../core/query/reduce/BrokerReduceServiceTest.java |   3 +-
 .../query/scheduler/PrioritySchedulerTest.java     |   3 +-
 .../pinot/core/transport/QueryRoutingTest.java     |   8 +-
 .../tests/MultiStageEngineIntegrationTest.java     |   6 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |   3 +
 .../server/starter/helix/BaseServerStarter.java    |   4 +-
 31 files changed, 627 insertions(+), 567 deletions(-)

diff --git 
a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
 
b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
index 24abdb367c..4929ebc122 100644
--- 
a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
+++ 
b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
@@ -18,12 +18,12 @@
  */
 package org.apache.pinot.connector.spark.connector
 
-import org.apache.pinot.connector.spark.connector.PinotUtils._
 import org.apache.pinot.common.utils.DataSchema
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType
 import org.apache.pinot.connector.spark.BaseTest
+import org.apache.pinot.connector.spark.connector.PinotUtils._
 import org.apache.pinot.connector.spark.exceptions.PinotException
-import org.apache.pinot.core.common.datatable.DataTableBuilder
+import org.apache.pinot.core.common.datatable.DataTableFactory
 import org.apache.pinot.spi.data.Schema
 import org.apache.pinot.spi.utils.ByteArray
 import org.apache.spark.sql.catalyst.util.ArrayData
@@ -74,7 +74,7 @@ class PinotUtilsTest extends BaseTest {
     )
     val dataSchema = new DataSchema(columnNames, columnTypes)
 
-    val dataTableBuilder = new DataTableBuilder(dataSchema)
+    val dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema)
     dataTableBuilder.startRow()
     dataTableBuilder.setColumn(0, "strValueDim")
     dataTableBuilder.setColumn(1, 5)
@@ -140,7 +140,7 @@ class PinotUtilsTest extends BaseTest {
     val columnTypes = Array(ColumnDataType.STRING, ColumnDataType.INT)
     val dataSchema = new DataSchema(columnNames, columnTypes)
 
-    val dataTableBuilder = new DataTableBuilder(dataSchema)
+    val dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema)
     dataTableBuilder.startRow()
     dataTableBuilder.setColumn(0, "strValueDim")
     dataTableBuilder.setColumn(1, 5)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
deleted file mode 100644
index f157d3274e..0000000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.roaringbitmap.RoaringBitmap;
-
-
-public final class NullBitmapUtils {
-  private NullBitmapUtils() {
-  }
-
-  public static void setNullRowIds(RoaringBitmap nullBitmap, 
ByteArrayOutputStream fixedSizeByteArrayOutputStream,
-      ByteArrayOutputStream variableSizeDataByteArrayOutputStream)
-      throws IOException {
-    writeInt(fixedSizeByteArrayOutputStream, 
variableSizeDataByteArrayOutputStream.size());
-    if (nullBitmap == null || nullBitmap.isEmpty()) {
-      writeInt(fixedSizeByteArrayOutputStream, 0);
-    } else {
-      byte[] nullBitmapBytes = 
ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullBitmap);
-      writeInt(fixedSizeByteArrayOutputStream, nullBitmapBytes.length);
-      variableSizeDataByteArrayOutputStream.write(nullBitmapBytes);
-    }
-  }
-
-  private static void writeInt(ByteArrayOutputStream out, int value) {
-    out.write((value >>> 24) & 0xFF);
-    out.write((value >>> 16) & 0xFF);
-    out.write((value >>> 8) & 0xFF);
-    out.write(value & 0xFF);
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 04a0ad1ded..dff60efac9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -85,7 +85,7 @@ public abstract class BaseDataBlock implements DataTable {
   protected int _numRows;
   protected int _numColumns;
   protected DataSchema _dataSchema;
-  protected Map<String, Map<Integer, String>> _dictionaryMap;
+  protected String[] _stringDictionary;
   protected byte[] _fixedSizeDataBytes;
   protected ByteBuffer _fixedSizeData;
   protected byte[] _variableSizeDataBytes;
@@ -96,16 +96,16 @@ public abstract class BaseDataBlock implements DataTable {
    * construct a base data block.
    * @param numRows num of rows in the block
    * @param dataSchema schema of the data in the block
-   * @param dictionaryMap dictionary encoding map
+   * @param stringDictionary dictionary encoding map
    * @param fixedSizeDataBytes byte[] for fix-sized columns.
    * @param variableSizeDataBytes byte[] for variable length columns (arrays).
    */
-  public BaseDataBlock(int numRows, DataSchema dataSchema, Map<String, 
Map<Integer, String>> dictionaryMap,
+  public BaseDataBlock(int numRows, DataSchema dataSchema, String[] 
stringDictionary,
       byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
     _numRows = numRows;
     _numColumns = dataSchema.size();
     _dataSchema = dataSchema;
-    _dictionaryMap = dictionaryMap;
+    _stringDictionary = stringDictionary;
     _fixedSizeDataBytes = fixedSizeDataBytes;
     _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
     _variableSizeDataBytes = variableSizeDataBytes;
@@ -121,7 +121,7 @@ public abstract class BaseDataBlock implements DataTable {
     _numRows = 0;
     _numColumns = 0;
     _dataSchema = null;
-    _dictionaryMap = null;
+    _stringDictionary = null;
     _fixedSizeDataBytes = null;
     _fixedSizeData = null;
     _variableSizeDataBytes = null;
@@ -158,9 +158,9 @@ public abstract class BaseDataBlock implements DataTable {
     // Read dictionary.
     if (dictionaryMapLength != 0) {
       byteBuffer.position(dictionaryMapStart);
-      _dictionaryMap = deserializeDictionaryMap(byteBuffer);
+      _stringDictionary = deserializeStringDictionary(byteBuffer);
     } else {
-      _dictionaryMap = null;
+      _stringDictionary = null;
     }
 
     // Read data schema.
@@ -282,8 +282,7 @@ public abstract class BaseDataBlock implements DataTable {
   @Override
   public String getString(int rowId, int colId) {
     positionCursorInFixSizedBuffer(rowId, colId);
-    int dictId = _fixedSizeData.getInt();
-    return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId);
+    return _stringDictionary[_fixedSizeData.getInt()];
   }
 
   @Override
@@ -355,9 +354,8 @@ public abstract class BaseDataBlock implements DataTable {
   public String[] getStringArray(int rowId, int colId) {
     int length = positionCursorInVariableBuffer(rowId, colId);
     String[] strings = new String[length];
-    Map<Integer, String> dictionary = 
_dictionaryMap.get(_dataSchema.getColumnName(colId));
     for (int i = 0; i < length; i++) {
-      strings[i] = dictionary.get(_variableSizeData.getInt());
+      strings[i] = _stringDictionary[_variableSizeData.getInt()];
     }
     return strings;
   }
@@ -369,26 +367,16 @@ public abstract class BaseDataBlock implements DataTable {
   /**
    * Helper method to serialize dictionary map.
    */
-  protected byte[] serializeDictionaryMap()
+  protected byte[] serializeStringDictionary()
       throws IOException {
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
 
-    dataOutputStream.writeInt(_dictionaryMap.size());
-    for (Map.Entry<String, Map<Integer, String>> dictionaryMapEntry : 
_dictionaryMap.entrySet()) {
-      String columnName = dictionaryMapEntry.getKey();
-      Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
-      byte[] bytes = columnName.getBytes(UTF_8);
-      dataOutputStream.writeInt(bytes.length);
-      dataOutputStream.write(bytes);
-      dataOutputStream.writeInt(dictionary.size());
-
-      for (Map.Entry<Integer, String> dictionaryEntry : dictionary.entrySet()) 
{
-        dataOutputStream.writeInt(dictionaryEntry.getKey());
-        byte[] valueBytes = dictionaryEntry.getValue().getBytes(UTF_8);
-        dataOutputStream.writeInt(valueBytes.length);
-        dataOutputStream.write(valueBytes);
-      }
+    dataOutputStream.writeInt(_stringDictionary.length);
+    for (String entry : _stringDictionary) {
+      byte[] valueBytes = entry.getBytes(UTF_8);
+      dataOutputStream.writeInt(valueBytes.length);
+      dataOutputStream.write(valueBytes);
     }
 
     return byteArrayOutputStream.toByteArray();
@@ -397,24 +385,14 @@ public abstract class BaseDataBlock implements DataTable {
   /**
    * Helper method to deserialize dictionary map.
    */
-  protected Map<String, Map<Integer, String>> 
deserializeDictionaryMap(ByteBuffer buffer)
+  protected String[] deserializeStringDictionary(ByteBuffer buffer)
       throws IOException {
-    int numDictionaries = buffer.getInt();
-    Map<String, Map<Integer, String>> dictionaryMap = new 
HashMap<>(numDictionaries);
-
-    for (int i = 0; i < numDictionaries; i++) {
-      String column = DataTableUtils.decodeString(buffer);
-      int dictionarySize = buffer.getInt();
-      Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
-      for (int j = 0; j < dictionarySize; j++) {
-        int key = buffer.getInt();
-        String value = DataTableUtils.decodeString(buffer);
-        dictionary.put(key, value);
-      }
-      dictionaryMap.put(column, dictionary);
+    int dictionarySize = buffer.getInt();
+    String[] stringDictionary = new String[dictionarySize];
+    for (int i = 0; i < dictionarySize; i++) {
+      stringDictionary[i] = DataTableUtils.decodeString(buffer);
     }
-
-    return dictionaryMap;
+    return stringDictionary;
   }
 
   @Override
@@ -473,11 +451,11 @@ public abstract class BaseDataBlock implements DataTable {
 
     // Write dictionary map section offset(START|SIZE).
     dataOutputStream.writeInt(dataOffset);
-    byte[] dictionaryMapBytes = null;
-    if (_dictionaryMap != null) {
-      dictionaryMapBytes = serializeDictionaryMap();
-      dataOutputStream.writeInt(dictionaryMapBytes.length);
-      dataOffset += dictionaryMapBytes.length;
+    byte[] dictionaryBytes = null;
+    if (_stringDictionary != null) {
+      dictionaryBytes = serializeStringDictionary();
+      dataOutputStream.writeInt(dictionaryBytes.length);
+      dataOffset += dictionaryBytes.length;
     } else {
       dataOutputStream.writeInt(0);
     }
@@ -514,8 +492,8 @@ public abstract class BaseDataBlock implements DataTable {
     // Write exceptions bytes.
     dataOutputStream.write(exceptionsBytes);
     // Write dictionary map bytes.
-    if (dictionaryMapBytes != null) {
-      dataOutputStream.write(dictionaryMapBytes);
+    if (dictionaryBytes != null) {
+      dataOutputStream.write(dictionaryBytes);
     }
     // Write data schema bytes.
     if (dataSchemaBytes != null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
index 1d2429f118..a83b8b2a30 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
 
 
@@ -36,9 +35,9 @@ public class ColumnarDataBlock extends BaseDataBlock {
     super();
   }
 
-  public ColumnarDataBlock(int numRows, DataSchema dataSchema, Map<String, 
Map<Integer, String>> dictionaryMap,
+  public ColumnarDataBlock(int numRows, DataSchema dataSchema, String[] 
stringDictionary,
       byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
-    super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, 
variableSizeDataBytes);
+    super(numRows, dataSchema, stringDictionary, fixedSizeDataBytes, 
variableSizeDataBytes);
     computeBlockObjectConstants();
   }
 
@@ -89,7 +88,7 @@ public class ColumnarDataBlock extends BaseDataBlock {
 
   @Override
   public ColumnarDataBlock toDataOnlyDataTable() {
-    return new ColumnarDataBlock(_numRows, _dataSchema, _dictionaryMap, 
_fixedSizeDataBytes, _variableSizeDataBytes);
+    return new ColumnarDataBlock(_numRows, _dataSchema, _stringDictionary, 
_fixedSizeDataBytes, _variableSizeDataBytes);
   }
 
   // TODO: add whole-column access methods.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 8a14b491e9..788b68a91d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -18,17 +18,16 @@
  */
 package org.apache.pinot.core.common.datablock;
 
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.NullBitmapUtils;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -49,14 +48,13 @@ public class DataBlockBuilder {
   private int _numRows;
   private int _numColumns;
 
-  private final Map<String, Map<String, Integer>> _dictionaryMap = new 
HashMap<>();
-  private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new 
HashMap<>();
+  private final Object2IntOpenHashMap<String> _dictionary = new 
Object2IntOpenHashMap<>();
   private final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream = 
new ByteArrayOutputStream();
+  private final DataOutputStream _fixedSizeDataOutputStream = new 
DataOutputStream(_fixedSizeDataByteArrayOutputStream);
   private final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream = 
new ByteArrayOutputStream();
   private final DataOutputStream _variableSizeDataOutputStream =
       new DataOutputStream(_variableSizeDataByteArrayOutputStream);
 
-
   private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type 
blockType) {
     _dataSchema = dataSchema;
     _columnDataType = dataSchema.getStoredColumnDataTypes();
@@ -77,10 +75,16 @@ public class DataBlockBuilder {
     }
   }
 
-  public void setNullRowIds(RoaringBitmap nullBitmap)
+  public void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
       throws IOException {
-    NullBitmapUtils.setNullRowIds(nullBitmap, 
_fixedSizeDataByteArrayOutputStream,
-        _variableSizeDataByteArrayOutputStream);
+    
_fixedSizeDataOutputStream.writeInt(_variableSizeDataByteArrayOutputStream.size());
+    if (nullRowIds == null || nullRowIds.isEmpty()) {
+      _fixedSizeDataOutputStream.writeInt(0);
+    } else {
+      byte[] bitmapBytes = 
ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullRowIds);
+      _fixedSizeDataOutputStream.writeInt(bitmapBytes.length);
+      _variableSizeDataByteArrayOutputStream.write(bitmapBytes);
+    }
   }
 
   public static RowDataBlock buildFromRows(List<Object[]> rows, @Nullable 
RoaringBitmap[] colNullBitmaps,
@@ -110,7 +114,7 @@ public class DataBlockBuilder {
             setColumn(rowBuilder, byteBuffer, (BigDecimal) value);
             break;
           case STRING:
-            setColumn(rowBuilder, byteBuffer, i, (String) value);
+            setColumn(rowBuilder, byteBuffer, (String) value);
             break;
           case BYTES:
             setColumn(rowBuilder, byteBuffer, (ByteArray) value);
@@ -165,12 +169,12 @@ public class DataBlockBuilder {
             break;
           case BYTES_ARRAY:
           case STRING_ARRAY:
-            setColumn(rowBuilder, byteBuffer, i, (String[]) value);
+            setColumn(rowBuilder, byteBuffer, (String[]) value);
             break;
           default:
-            throw new IllegalStateException(String.format(
-                "Unsupported data type: %s for column: %s", 
rowBuilder._columnDataType[i],
-                rowBuilder._dataSchema.getColumnName(i)));
+            throw new IllegalStateException(
+                String.format("Unsupported data type: %s for column: %s", 
rowBuilder._columnDataType[i],
+                    rowBuilder._dataSchema.getColumnName(i)));
         }
       }
       rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());
@@ -220,7 +224,7 @@ public class DataBlockBuilder {
           break;
         case STRING:
           for (Object value : column) {
-            setColumn(columnarBuilder, byteBuffer, i, (String) value);
+            setColumn(columnarBuilder, byteBuffer, (String) value);
           }
           break;
         case BYTES:
@@ -289,13 +293,13 @@ public class DataBlockBuilder {
         case BYTES_ARRAY:
         case STRING_ARRAY:
           for (Object value : column) {
-            setColumn(columnarBuilder, byteBuffer, i, (String[]) value);
+            setColumn(columnarBuilder, byteBuffer, (String[]) value);
           }
           break;
         default:
-          throw new IllegalStateException(String.format(
-              "Unsupported data type: %s for column: %s", 
columnarBuilder._columnDataType[i],
-              columnarBuilder._dataSchema.getColumnName(i)));
+          throw new IllegalStateException(
+              String.format("Unsupported data type: %s for column: %s", 
columnarBuilder._columnDataType[i],
+                  columnarBuilder._dataSchema.getColumnName(i)));
       }
       
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());
     }
@@ -303,17 +307,25 @@ public class DataBlockBuilder {
   }
 
   private static RowDataBlock buildRowBlock(DataBlockBuilder builder) {
-    return new RowDataBlock(builder._numRows, builder._dataSchema, 
builder._reverseDictionaryMap,
+    return new RowDataBlock(builder._numRows, builder._dataSchema, 
getReverseDictionary(builder._dictionary),
         builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
         builder._variableSizeDataByteArrayOutputStream.toByteArray());
   }
 
   private static ColumnarDataBlock buildColumnarBlock(DataBlockBuilder 
builder) {
-    return new ColumnarDataBlock(builder._numRows, builder._dataSchema, 
builder._reverseDictionaryMap,
+    return new ColumnarDataBlock(builder._numRows, builder._dataSchema, 
getReverseDictionary(builder._dictionary),
         builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
         builder._variableSizeDataByteArrayOutputStream.toByteArray());
   }
 
+  private static String[] getReverseDictionary(Object2IntOpenHashMap<String> 
dictionary) {
+    String[] reverseDictionary = new String[dictionary.size()];
+    for (Object2IntMap.Entry<String> entry : dictionary.object2IntEntrySet()) {
+      reverseDictionary[entry.getIntValue()] = entry.getKey();
+    }
+    return reverseDictionary;
+  }
+
   private static void setColumn(DataBlockBuilder builder, ByteBuffer 
byteBuffer, BigDecimal value)
       throws IOException {
     byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
@@ -322,20 +334,9 @@ public class DataBlockBuilder {
     builder._variableSizeDataByteArrayOutputStream.write(bytes);
   }
 
-  private static void setColumn(DataBlockBuilder builder, ByteBuffer 
byteBuffer, int colId, String value) {
-    String columnName = builder._dataSchema.getColumnName(colId);
-    Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName);
-    if (dictionary == null) {
-      dictionary = new HashMap<>();
-      builder._dictionaryMap.put(columnName, dictionary);
-      builder._reverseDictionaryMap.put(columnName, new HashMap<>());
-    }
-    Integer dictId = dictionary.get(value);
-    if (dictId == null) {
-      dictId = dictionary.size();
-      dictionary.put(value, dictId);
-      builder._reverseDictionaryMap.get(columnName).put(dictId, value);
-    }
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer 
byteBuffer, String value) {
+    Object2IntOpenHashMap<String> dictionary = builder._dictionary;
+    int dictId = dictionary.computeIntIfAbsent(value, k -> dictionary.size());
     byteBuffer.putInt(dictId);
   }
 
@@ -398,26 +399,13 @@ public class DataBlockBuilder {
     }
   }
 
-  private static void setColumn(DataBlockBuilder builder, ByteBuffer 
byteBuffer, int colId, String[] values)
+  private static void setColumn(DataBlockBuilder builder, ByteBuffer 
byteBuffer, String[] values)
       throws IOException {
     byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
     byteBuffer.putInt(values.length);
-
-    String columnName = builder._dataSchema.getColumnName(colId);
-    Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName);
-    if (dictionary == null) {
-      dictionary = new HashMap<>();
-      builder._dictionaryMap.put(columnName, dictionary);
-      builder._reverseDictionaryMap.put(columnName, new HashMap<>());
-    }
-
+    Object2IntOpenHashMap<String> dictionary = builder._dictionary;
     for (String value : values) {
-      Integer dictId = dictionary.get(value);
-      if (dictId == null) {
-        dictId = dictionary.size();
-        dictionary.put(value, dictId);
-        builder._reverseDictionaryMap.get(columnName).put(dictId, value);
-      }
+      int dictId = dictionary.computeIntIfAbsent(value, k -> 
dictionary.size());
       builder._variableSizeDataOutputStream.writeInt(dictId);
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
index 06a637fc24..7ad53f6bf5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import org.apache.pinot.common.utils.DataSchema;
 
 
@@ -31,7 +30,7 @@ public class MetadataBlock extends BaseDataBlock {
   private static final int VERSION = 1;
 
   public MetadataBlock(DataSchema dataSchema) {
-    super(0, dataSchema, Collections.emptyMap(), new byte[]{0}, new byte[]{0});
+    super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
   }
 
   public MetadataBlock(ByteBuffer byteBuffer)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
index eec72546f0..7946cc428d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.common.datablock;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.roaringbitmap.RoaringBitmap;
@@ -38,9 +37,9 @@ public class RowDataBlock extends BaseDataBlock {
     super();
   }
 
-  public RowDataBlock(int numRows, DataSchema dataSchema, Map<String, 
Map<Integer, String>> dictionaryMap,
+  public RowDataBlock(int numRows, DataSchema dataSchema, String[] 
stringDictionary,
       byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
-    super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, 
variableSizeDataBytes);
+    super(numRows, dataSchema, stringDictionary, fixedSizeDataBytes, 
variableSizeDataBytes);
     computeBlockObjectConstants();
   }
 
@@ -108,7 +107,7 @@ public class RowDataBlock extends BaseDataBlock {
 
   @Override
   public RowDataBlock toDataOnlyDataTable() {
-    return new RowDataBlock(_numRows, _dataSchema, _dictionaryMap, 
_fixedSizeDataBytes, _variableSizeDataBytes);
+    return new RowDataBlock(_numRows, _dataSchema, _stringDictionary, 
_fixedSizeDataBytes, _variableSizeDataBytes);
   }
 
   // TODO: add whole-row access methods.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
new file mode 100644
index 0000000000..a332adb381
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.datatable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+
+
+/**
+ * Base DataTableBuilder implementation.
+ */
+public abstract class BaseDataTableBuilder implements DataTableBuilder {
+  protected final DataSchema _dataSchema;
+  protected final int _version;
+  protected final int[] _columnOffsets;
+  protected final int _rowSizeInBytes;
+  protected final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream = 
new ByteArrayOutputStream();
+  protected final DataOutputStream _fixedSizeDataOutputStream =
+      new DataOutputStream(_fixedSizeDataByteArrayOutputStream);
+  protected final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream 
= new ByteArrayOutputStream();
+  protected final DataOutputStream _variableSizeDataOutputStream =
+      new DataOutputStream(_variableSizeDataByteArrayOutputStream);
+
+  protected int _numRows;
+  protected ByteBuffer _currentRowDataByteBuffer;
+
+  public BaseDataTableBuilder(DataSchema dataSchema, int version) {
+    _dataSchema = dataSchema;
+    _version = version;
+    _columnOffsets = new int[dataSchema.size()];
+    _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, 
_columnOffsets, _version);
+  }
+
+  @Override
+  public void startRow() {
+    _numRows++;
+    _currentRowDataByteBuffer = ByteBuffer.allocate(_rowSizeInBytes);
+  }
+
+  @Override
+  public void setColumn(int colId, int value) {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    _currentRowDataByteBuffer.putInt(value);
+  }
+
+  @Override
+  public void setColumn(int colId, long value) {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    _currentRowDataByteBuffer.putLong(value);
+  }
+
+  @Override
+  public void setColumn(int colId, float value) {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    _currentRowDataByteBuffer.putFloat(value);
+  }
+
+  @Override
+  public void setColumn(int colId, double value) {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    _currentRowDataByteBuffer.putDouble(value);
+  }
+
+  @Override
+  public void setColumn(int colId, BigDecimal value)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    byte[] bytes = BigDecimalUtils.serialize(value);
+    _currentRowDataByteBuffer.putInt(bytes.length);
+    _variableSizeDataByteArrayOutputStream.write(bytes);
+  }
+
+  @Override
+  public void setColumn(int colId, Object value)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    int objectTypeValue = 
ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
+    if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+      _currentRowDataByteBuffer.putInt(0);
+      _variableSizeDataOutputStream.writeInt(objectTypeValue);
+    } else {
+      byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
+      _currentRowDataByteBuffer.putInt(bytes.length);
+      _variableSizeDataOutputStream.writeInt(objectTypeValue);
+      _variableSizeDataByteArrayOutputStream.write(bytes);
+    }
+  }
+
+  @Override
+  public void setColumn(int colId, int[] values)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    _currentRowDataByteBuffer.putInt(values.length);
+    for (int value : values) {
+      _variableSizeDataOutputStream.writeInt(value);
+    }
+  }
+
+  @Override
+  public void setColumn(int colId, long[] values)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    _currentRowDataByteBuffer.putInt(values.length);
+    for (long value : values) {
+      _variableSizeDataOutputStream.writeLong(value);
+    }
+  }
+
+  @Override
+  public void setColumn(int colId, float[] values)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    _currentRowDataByteBuffer.putInt(values.length);
+    for (float value : values) {
+      _variableSizeDataOutputStream.writeFloat(value);
+    }
+  }
+
+  @Override
+  public void setColumn(int colId, double[] values)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    _currentRowDataByteBuffer.putInt(values.length);
+    for (double value : values) {
+      _variableSizeDataOutputStream.writeDouble(value);
+    }
+  }
+
+  @Override
+  public void finishRow()
+      throws IOException {
+    
_fixedSizeDataByteArrayOutputStream.write(_currentRowDataByteBuffer.array());
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index 941fabc2ee..b592c45c60 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -18,318 +18,79 @@
  */
 package org.apache.pinot.core.common.datatable;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.pinot.common.utils.DataSchema;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.NullBitmapUtils;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
+import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 
 
 /**
+ * DataTable holds data in a matrix form. The purpose of this interface is to 
provide a way to construct a data table
+ * and ability to serialize and deserialize.
  *
- * Datatable that holds data in a matrix form. The purpose of this class is to
- * provide a way to construct a datatable and ability to serialize and
- * deserialize.<br>
- * Why can't we use existing serialization/deserialization mechanism. Most
- * existing techniques protocol buffer, thrift, avro are optimized for
- * transporting a single record but Pinot transfers quite a lot of data from
- * server to broker during the scatter/gather operation. The cost of
- * serialization and deserialization directly impacts the performance. Most
- * ser/deser requires us to convert the primitives data types in objects like
- * Integer etc. This is waste of cpu resource and increase the payload size. We
- * optimize the data format for Pinot usecase. We can also support lazy
- * construction of obejcts. Infact we retain the bytes as it is and will be 
able
- * to lookup the a field directly within a byte buffer.<br>
- *
- * USAGE:
- *
- * Datatable is initialized with the schema of the table. Schema describes the
- * columnnames, their order and data type for each column.<br>
- * Each row must follow the same convention. We don't support MultiValue 
columns
- * for now. Format,
- * |VERSION,DATA_START_OFFSET,DICTIONARY_START_OFFSET,INDEX_START_OFFSET
- * ,METADATA_START_OFFSET | |&lt;DATA&gt; |
- *
- * |&lt;DICTIONARY&gt;|
- *
- *
- * |&lt;METADATA&gt;| Data contains the actual values written by the 
application We
- * first write the entire data in its raw byte format. For example if you data
- * type is Int, it will write 4 bytes. For most data types that are fixed 
width,
- * we just write the raw data. For special cases like String, we create a
- * dictionary. Dictionary will be never exposed to the user. All conversions
- * will be done internally. In future, we might decide dynamically if 
dictionary
- * creation is needed, for now we will always create dictionaries for string
- * columns. During deserialization we will always load the dictionary
- * first.Overall having dictionary allow us to convert data table into a fixed
- * width matrix and thus allowing look up and easy traversal.
- *
- *
+ * <p>Why can't we use existing serialization/deserialization mechanism:
+ * <p>Most existing techniques (protocol buffer, thrift, avro) are optimized 
for transporting a single record but Pinot
+ * transfers quite a lot of data from server to broker during the 
scatter/gather operation. The cost of serialization
+ * and deserialization directly impacts the performance. Most ser/deser 
requires us to convert the primitive data types
+ * into objects like Integer etc. This will waste cpu resource and increase 
the payload size. We optimize the data
+ * format for Pinot use case. We can also support lazy construction of 
objects. In fact we retain the bytes as it is and
+ * will be able to look up a field directly within a byte buffer.
  */
-// TODO: potential optimizations (DataTableV3 and below)
-// TODO:   - Fix float size.
-// TODO:     note:  (Fixed in V4, remove this comment once V3 is deprecated)
-// TODO:   - Given a data schema, write all values one by one instead of using 
rowId and colId to position (save time).
-// TODO:     note:  (Fixed in V4, remove this comment once V3 is deprecated)
-// TODO:   - Store bytes as variable size data instead of String
-// TODO:     note:  (Fixed in V4, remove this comment once V3 is deprecated)
-// TODO:   - use builder factory pattern to for different version so that no 
version check per build.
-// TODO:   - Use one dictionary for all columns (save space).
-
-public class DataTableBuilder {
-  public static final int VERSION_2 = 2;
-  public static final int VERSION_3 = 3;
-  public static final int VERSION_4 = 4;
-  private static int _version = VERSION_3;
-  private final DataSchema _dataSchema;
-  private final int[] _columnOffsets;
-  private final int _rowSizeInBytes;
-  private final Map<String, Map<String, Integer>> _dictionaryMap = new 
HashMap<>();
-  private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new 
HashMap<>();
-  private final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream = 
new ByteArrayOutputStream();
-  private final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream = 
new ByteArrayOutputStream();
-  private final DataOutputStream _variableSizeDataOutputStream =
-      new DataOutputStream(_variableSizeDataByteArrayOutputStream);
-
-  private int _numRows;
-  private ByteBuffer _currentRowDataByteBuffer;
-
-  public DataTableBuilder(DataSchema dataSchema) {
-    _dataSchema = dataSchema;
-    _columnOffsets = new int[dataSchema.size()];
-    _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, 
_columnOffsets, _version);
-  }
-
-  public static DataTable getEmptyDataTable() {
-    switch (_version) {
-      case VERSION_2:
-        return new DataTableImplV2();
-      case VERSION_3:
-        return new DataTableImplV3();
-      case VERSION_4:
-        return new DataTableImplV4();
-      default:
-        throw new IllegalStateException("Unexpected value: " + _version);
-    }
-  }
-
-  public static void setCurrentDataTableVersion(int version) {
-    if (version != VERSION_2 && version != VERSION_3 && version != VERSION_4) {
-      throw new IllegalArgumentException("Unsupported version: " + version);
-    }
-    _version = version;
-  }
-
-  public static int getCurrentDataTableVersion() {
-    return _version;
-  }
-
-  public void startRow() {
-    _numRows++;
-    _currentRowDataByteBuffer = ByteBuffer.allocate(_rowSizeInBytes);
-  }
-
-  public void setNullRowIds(RoaringBitmap nullBitmap)
-      throws IOException {
-    assert _version >= VERSION_4;
-    NullBitmapUtils.setNullRowIds(nullBitmap, 
_fixedSizeDataByteArrayOutputStream,
-        _variableSizeDataByteArrayOutputStream);
-  }
-
-  public void setColumn(int colId, boolean value) {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    if (value) {
-      _currentRowDataByteBuffer.put((byte) 1);
-    } else {
-      _currentRowDataByteBuffer.put((byte) 0);
-    }
-  }
-
-  public void setColumn(int colId, byte value) {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    _currentRowDataByteBuffer.put(value);
-  }
-
-  public void setColumn(int colId, char value) {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    _currentRowDataByteBuffer.putChar(value);
-  }
-
-  public void setColumn(int colId, short value) {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    _currentRowDataByteBuffer.putShort(value);
-  }
[email protected]
[email protected]
+public interface DataTableBuilder {
 
-  public void setColumn(int colId, int value) {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    _currentRowDataByteBuffer.putInt(value);
-  }
+  void startRow();
 
-  public void setColumn(int colId, long value) {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    _currentRowDataByteBuffer.putLong(value);
-  }
+  void setColumn(int colId, int value);
 
-  public void setColumn(int colId, float value) {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    _currentRowDataByteBuffer.putFloat(value);
-  }
+  void setColumn(int colId, long value);
 
-  public void setColumn(int colId, double value) {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    _currentRowDataByteBuffer.putDouble(value);
-  }
+  void setColumn(int colId, float value);
 
-  public void setColumn(int colId, BigDecimal value)
-      throws IOException {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    byte[] bytes = BigDecimalUtils.serialize(value);
-    _currentRowDataByteBuffer.putInt(bytes.length);
-    _variableSizeDataByteArrayOutputStream.write(bytes);
-  }
+  void setColumn(int colId, double value);
 
-  public void setColumn(int colId, String value) {
-    String columnName = _dataSchema.getColumnName(colId);
-    Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
-    if (dictionary == null) {
-      dictionary = new HashMap<>();
-      _dictionaryMap.put(columnName, dictionary);
-      _reverseDictionaryMap.put(columnName, new HashMap<>());
-    }
+  void setColumn(int colId, BigDecimal value)
+      throws IOException;
 
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    Integer dictId = dictionary.get(value);
-    if (dictId == null) {
-      dictId = dictionary.size();
-      dictionary.put(value, dictId);
-      _reverseDictionaryMap.get(columnName).put(dictId, value);
-    }
-    _currentRowDataByteBuffer.putInt(dictId);
-  }
+  void setColumn(int colId, String value);
 
-  public void setColumn(int colId, ByteArray value)
-      throws IOException {
-    if (_version >= 4) {
-      _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-      
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-      byte[] bytes = value.getBytes();
-      _currentRowDataByteBuffer.putInt(bytes.length);
-      _variableSizeDataByteArrayOutputStream.write(bytes);
-    } else {
-      // NOTE: Use String to store bytes value in DataTable V2 for 
backward-compatibility
-      setColumn(colId, value.toHexString());
-    }
-  }
+  void setColumn(int colId, ByteArray value)
+      throws IOException;
 
-  public void setColumn(int colId, Object value)
-      throws IOException {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    int objectTypeValue = 
ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
-    if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
-      _currentRowDataByteBuffer.putInt(0);
-      _variableSizeDataOutputStream.writeInt(objectTypeValue);
-    } else {
-      byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
-      _currentRowDataByteBuffer.putInt(bytes.length);
-      _variableSizeDataOutputStream.writeInt(objectTypeValue);
-      _variableSizeDataByteArrayOutputStream.write(bytes);
-    }
-  }
+  void setColumn(int colId, Object value)
+      throws IOException;
 
-  public void setColumn(int colId, int[] values)
-      throws IOException {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    _currentRowDataByteBuffer.putInt(values.length);
-    for (int value : values) {
-      _variableSizeDataOutputStream.writeInt(value);
-    }
-  }
+  void setColumn(int colId, int[] values)
+      throws IOException;
 
-  public void setColumn(int colId, long[] values)
-      throws IOException {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    _currentRowDataByteBuffer.putInt(values.length);
-    for (long value : values) {
-      _variableSizeDataOutputStream.writeLong(value);
-    }
-  }
+  void setColumn(int colId, long[] values)
+      throws IOException;
 
-  public void setColumn(int colId, float[] values)
-      throws IOException {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    _currentRowDataByteBuffer.putInt(values.length);
-    for (float value : values) {
-      _variableSizeDataOutputStream.writeFloat(value);
-    }
-  }
+  void setColumn(int colId, float[] values)
+      throws IOException;
 
-  public void setColumn(int colId, double[] values)
-      throws IOException {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    _currentRowDataByteBuffer.putInt(values.length);
-    for (double value : values) {
-      _variableSizeDataOutputStream.writeDouble(value);
-    }
-  }
+  void setColumn(int colId, double[] values)
+      throws IOException;
 
-  public void setColumn(int colId, String[] values)
-      throws IOException {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    _currentRowDataByteBuffer.putInt(values.length);
+  void setColumn(int colId, String[] values)
+      throws IOException;
 
-    String columnName = _dataSchema.getColumnName(colId);
-    Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
-    if (dictionary == null) {
-      dictionary = new HashMap<>();
-      _dictionaryMap.put(columnName, dictionary);
-      _reverseDictionaryMap.put(columnName, new HashMap<>());
-    }
+  // TODO: Support MV BYTES
 
-    for (String value : values) {
-      Integer dictId = dictionary.get(value);
-      if (dictId == null) {
-        dictId = dictionary.size();
-        dictionary.put(value, dictId);
-        _reverseDictionaryMap.get(columnName).put(dictId, value);
-      }
-      _variableSizeDataOutputStream.writeInt(dictId);
-    }
-  }
+  void finishRow()
+      throws IOException;
 
-  public void finishRow()
-      throws IOException {
-    
_fixedSizeDataByteArrayOutputStream.write(_currentRowDataByteBuffer.array());
-  }
+  /**
+   * NOTE: When setting nullRowIds, we don't pass the colId currently, and 
this method must be invoked for all columns.
+   * TODO: Revisit this
+   */
+  void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
+      throws IOException;
 
-  public DataTable build() {
-    switch (_version) {
-      case VERSION_2:
-        return new DataTableImplV2(_numRows, _dataSchema, 
_reverseDictionaryMap,
-            _fixedSizeDataByteArrayOutputStream.toByteArray(), 
_variableSizeDataByteArrayOutputStream.toByteArray());
-      case VERSION_3:
-        return new DataTableImplV3(_numRows, _dataSchema, 
_reverseDictionaryMap,
-            _fixedSizeDataByteArrayOutputStream.toByteArray(), 
_variableSizeDataByteArrayOutputStream.toByteArray());
-      case VERSION_4:
-        return new DataTableImplV4(_numRows, _dataSchema, 
_reverseDictionaryMap,
-            _fixedSizeDataByteArrayOutputStream.toByteArray(), 
_variableSizeDataByteArrayOutputStream.toByteArray());
-      default:
-        throw new IllegalStateException("Unexpected value: " + _version);
-    }
-  }
+  DataTable build();
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
new file mode 100644
index 0000000000..ea282bb559
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.datatable;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * Kept for backward compatible. Things improved in the newer versions:
+ * - Float size (should be 4 instead of 8)
+ * - Store bytes as variable size data instead of String
+ * - Use one dictionary for all columns (save space)
+ * - Support setting nullRowIds
+ */
+public class DataTableBuilderV2V3 extends BaseDataTableBuilder {
+  private final Map<String, Map<String, Integer>> _dictionaryMap = new 
HashMap<>();
+  private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new 
HashMap<>();
+
+  public DataTableBuilderV2V3(DataSchema dataSchema, int version) {
+    super(dataSchema, version);
+    Preconditions.checkArgument(version <= DataTableFactory.VERSION_3);
+  }
+
+  @Override
+  public void setColumn(int colId, String value) {
+    String columnName = _dataSchema.getColumnName(colId);
+    Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
+    if (dictionary == null) {
+      dictionary = new HashMap<>();
+      _dictionaryMap.put(columnName, dictionary);
+      _reverseDictionaryMap.put(columnName, new HashMap<>());
+    }
+
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    Integer dictId = dictionary.get(value);
+    if (dictId == null) {
+      dictId = dictionary.size();
+      dictionary.put(value, dictId);
+      _reverseDictionaryMap.get(columnName).put(dictId, value);
+    }
+    _currentRowDataByteBuffer.putInt(dictId);
+  }
+
+  @Override
+  public void setColumn(int colId, ByteArray value)
+      throws IOException {
+    setColumn(colId, value.toHexString());
+  }
+
+  @Override
+  public void setColumn(int colId, String[] values)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    _currentRowDataByteBuffer.putInt(values.length);
+
+    String columnName = _dataSchema.getColumnName(colId);
+    Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
+    if (dictionary == null) {
+      dictionary = new HashMap<>();
+      _dictionaryMap.put(columnName, dictionary);
+      _reverseDictionaryMap.put(columnName, new HashMap<>());
+    }
+
+    for (String value : values) {
+      Integer dictId = dictionary.get(value);
+      if (dictId == null) {
+        dictId = dictionary.size();
+        dictionary.put(value, dictId);
+        _reverseDictionaryMap.get(columnName).put(dictId, value);
+      }
+      _variableSizeDataOutputStream.writeInt(dictId);
+    }
+  }
+
+  @Override
+  public void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
+      throws IOException {
+    throw new UnsupportedOperationException("Not supported before DataTable 
V4");
+  }
+
+  @Override
+  public DataTable build() {
+    byte[] fixedSizeDataBytes = 
_fixedSizeDataByteArrayOutputStream.toByteArray();
+    byte[] variableSizeDataBytes = 
_variableSizeDataByteArrayOutputStream.toByteArray();
+    if (_version == DataTableFactory.VERSION_2) {
+      return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap, 
fixedSizeDataBytes,
+          variableSizeDataBytes);
+    } else {
+      return new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap, 
fixedSizeDataBytes,
+          variableSizeDataBytes);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
new file mode 100644
index 0000000000..8512024865
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.datatable;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class DataTableBuilderV4 extends BaseDataTableBuilder {
+  private final Object2IntOpenHashMap<String> _dictionary = new 
Object2IntOpenHashMap<>();
+
+  public DataTableBuilderV4(DataSchema dataSchema) {
+    super(dataSchema, DataTableFactory.VERSION_4);
+  }
+
+  @Override
+  public void setColumn(int colId, String value) {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    int dictId = _dictionary.computeIntIfAbsent(value, k -> 
_dictionary.size());
+    _currentRowDataByteBuffer.putInt(dictId);
+  }
+
+  @Override
+  public void setColumn(int colId, ByteArray value)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    byte[] bytes = value.getBytes();
+    _currentRowDataByteBuffer.putInt(bytes.length);
+    _variableSizeDataByteArrayOutputStream.write(bytes);
+  }
+
+  @Override
+  public void setColumn(int colId, String[] values)
+      throws IOException {
+    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+    _currentRowDataByteBuffer.putInt(values.length);
+    for (String value : values) {
+      int dictId = _dictionary.computeIntIfAbsent(value, k -> 
_dictionary.size());
+      _variableSizeDataOutputStream.writeInt(dictId);
+    }
+  }
+
+  @Override
+  public void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
+      throws IOException {
+    
_fixedSizeDataOutputStream.writeInt(_variableSizeDataByteArrayOutputStream.size());
+    if (nullRowIds == null || nullRowIds.isEmpty()) {
+      _fixedSizeDataOutputStream.writeInt(0);
+    } else {
+      byte[] bitmapBytes = 
ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullRowIds);
+      _fixedSizeDataOutputStream.writeInt(bitmapBytes.length);
+      _variableSizeDataByteArrayOutputStream.write(bitmapBytes);
+    }
+  }
+
+  @Override
+  public DataTable build() {
+    String[] reverseDictionary = new String[_dictionary.size()];
+    for (Object2IntMap.Entry<String> entry : _dictionary.object2IntEntrySet()) 
{
+      reverseDictionary[entry.getIntValue()] = entry.getKey();
+    }
+    return new DataTableImplV4(_numRows, _dataSchema, reverseDictionary,
+        _fixedSizeDataByteArrayOutputStream.toByteArray(), 
_variableSizeDataByteArrayOutputStream.toByteArray());
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
index 4a39f110d9..cbe64b4aef 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
@@ -20,22 +20,71 @@ package org.apache.pinot.core.common.datatable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class DataTableFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataTableFactory.class);
+  public static final int VERSION_2 = 2;
+  public static final int VERSION_3 = 3;
+  public static final int VERSION_4 = 4;
+  public static final int DEFAULT_VERSION = VERSION_3;
+
+  private static int _version = DataTableFactory.DEFAULT_VERSION;
+
   private DataTableFactory() {
   }
 
+  public static int getDataTableVersion() {
+    return _version;
+  }
+
+  public static void setDataTableVersion(int version) {
+    LOGGER.info("Setting DataTable version to: " + version);
+    if (version != DataTableFactory.VERSION_2 && version != 
DataTableFactory.VERSION_3
+        && version != DataTableFactory.VERSION_4) {
+      throw new IllegalArgumentException("Unsupported version: " + version);
+    }
+    _version = version;
+  }
+
+  public static DataTable getEmptyDataTable() {
+    switch (_version) {
+      case VERSION_2:
+        return new DataTableImplV2();
+      case VERSION_3:
+        return new DataTableImplV3();
+      case VERSION_4:
+        return new DataTableImplV4();
+      default:
+        throw new IllegalStateException("Unexpected value: " + _version);
+    }
+  }
+
+  public static DataTableBuilder getDataTableBuilder(DataSchema dataSchema) {
+    switch (_version) {
+      case VERSION_2:
+      case VERSION_3:
+        return new DataTableBuilderV2V3(dataSchema, _version);
+      case VERSION_4:
+        return new DataTableBuilderV4(dataSchema);
+      default:
+        throw new UnsupportedOperationException("Unsupported data table 
version: " + _version);
+    }
+  }
+
   public static DataTable getDataTable(ByteBuffer byteBuffer)
       throws IOException {
     int version = byteBuffer.getInt();
     switch (version) {
-      case DataTableBuilder.VERSION_2:
+      case VERSION_2:
         return new DataTableImplV2(byteBuffer);
-      case DataTableBuilder.VERSION_3:
+      case VERSION_3:
         return new DataTableImplV3(byteBuffer);
-      case DataTableBuilder.VERSION_4:
+      case VERSION_4:
         return new DataTableImplV4(byteBuffer);
       default:
         throw new UnsupportedOperationException("Unsupported data table 
version: " + version);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
index af3665a48e..252d7303b2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
@@ -124,7 +124,7 @@ public class DataTableImplV2 extends BaseDataTable {
 
   @Override
   public int getVersion() {
-    return DataTableBuilder.VERSION_2;
+    return DataTableFactory.VERSION_2;
   }
 
   private Map<String, String> deserializeMetadata(ByteBuffer buffer)
@@ -170,7 +170,7 @@ public class DataTableImplV2 extends BaseDataTable {
       throws IOException {
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    dataOutputStream.writeInt(DataTableBuilder.VERSION_2);
+    dataOutputStream.writeInt(DataTableFactory.VERSION_2);
     dataOutputStream.writeInt(_numRows);
     dataOutputStream.writeInt(_numColumns);
     int dataOffset = HEADER_SIZE;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
index edaf1d5479..433167808a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
@@ -166,7 +166,7 @@ public class DataTableImplV3 extends BaseDataTable {
 
   @Override
   public int getVersion() {
-    return DataTableBuilder.VERSION_3;
+    return DataTableFactory.VERSION_3;
   }
 
   @Override
@@ -224,7 +224,7 @@ public class DataTableImplV3 extends BaseDataTable {
 
   private void writeLeadingSections(DataOutputStream dataOutputStream)
       throws IOException {
-    dataOutputStream.writeInt(DataTableBuilder.VERSION_3);
+    dataOutputStream.writeInt(DataTableFactory.VERSION_3);
     dataOutputStream.writeInt(_numRows);
     dataOutputStream.writeInt(_numColumns);
     int dataOffset = HEADER_SIZE;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
index fe32ac84a7..ba65453ec9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.common.datatable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.datablock.RowDataBlock;
 import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -42,13 +41,13 @@ public class DataTableImplV4 extends RowDataBlock {
     super(byteBuffer);
   }
 
-  public DataTableImplV4(int numRows, DataSchema dataSchema, Map<String, 
Map<Integer, String>> dictionaryMap,
-      byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
-    super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, 
variableSizeDataBytes);
+  public DataTableImplV4(int numRows, DataSchema dataSchema, String[] 
dictionary, byte[] fixedSizeDataBytes,
+      byte[] variableSizeDataBytes) {
+    super(numRows, dataSchema, dictionary, fixedSizeDataBytes, 
variableSizeDataBytes);
   }
 
   @Override
   protected int getDataBlockVersionType() {
-    return DataTableBuilder.VERSION_4;
+    return DataTableFactory.VERSION_4;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
index 1430869f95..a7b8652229 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pinot.core.common.datatable;
 
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -72,7 +69,7 @@ public class DataTableUtils {
           rowSizeInBytes += 8;
           break;
         case FLOAT:
-          if (dataTableVersion >= 4) {
+          if (dataTableVersion >= DataTableFactory.VERSION_4) {
             rowSizeInBytes += 4;
           } else {
             rowSizeInBytes += 8;
@@ -123,7 +120,7 @@ public class DataTableUtils {
     // NOTE: Use STRING column data type as default for selection query
     Arrays.fill(columnDataTypes, ColumnDataType.STRING);
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    return new DataTableBuilder(dataSchema).build();
+    return DataTableFactory.getDataTableBuilder(dataSchema).build();
   }
 
   /**
@@ -154,7 +151,7 @@ public class DataTableUtils {
         columnDataTypes[index] = 
aggregationFunction.getIntermediateResultColumnType();
         index++;
       }
-      return new DataTableBuilder(new DataSchema(columnNames, 
columnDataTypes)).build();
+      return DataTableFactory.getDataTableBuilder(new DataSchema(columnNames, 
columnDataTypes)).build();
     } else {
       // Aggregation only query
 
@@ -171,7 +168,8 @@ public class DataTableUtils {
       }
 
       // Build the data table
-      DataTableBuilder dataTableBuilder = new DataTableBuilder(new 
DataSchema(aggregationColumnNames, columnDataTypes));
+      DataTableBuilder dataTableBuilder =
+          DataTableFactory.getDataTableBuilder(new 
DataSchema(aggregationColumnNames, columnDataTypes));
       dataTableBuilder.startRow();
       for (int i = 0; i < numAggregations; i++) {
         switch (columnDataTypes[i]) {
@@ -214,7 +212,7 @@ public class DataTableUtils {
         new DistinctTable(new DataSchema(columnNames, columnDataTypes), 
Collections.emptySet());
 
     // Build the data table
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(
+    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
         new DataSchema(new 
String[]{distinctAggregationFunction.getColumnName()},
             new ColumnDataType[]{ColumnDataType.OBJECT}));
     dataTableBuilder.startRow();
@@ -223,22 +221,6 @@ public class DataTableUtils {
     return dataTableBuilder.build();
   }
 
-  /**
-   * Helper method to decode string.
-   */
-  public static String decodeString(DataInputStream dataInputStream)
-      throws IOException {
-    int length = dataInputStream.readInt();
-    if (length == 0) {
-      return StringUtils.EMPTY;
-    } else {
-      byte[] buffer = new byte[length];
-      int numBytesRead = dataInputStream.read(buffer);
-      assert numBytesRead == length;
-      return new String(buffer, UTF_8);
-    }
-  }
-
   /**
    * Helper method to decode string.
    */
@@ -246,35 +228,11 @@ public class DataTableUtils {
       throws IOException {
     int length = buffer.getInt();
     if (length == 0) {
-      return "";
+      return StringUtils.EMPTY;
     } else {
       byte[] bytes = new byte[length];
       buffer.get(bytes);
       return new String(bytes, UTF_8);
     }
   }
-
-  /**
-   * Helper method to decode int.
-   */
-  public static int decodeInt(DataInputStream dataInputStream)
-      throws IOException {
-    int length = Integer.BYTES;
-    byte[] buffer = new byte[length];
-    int numBytesRead = dataInputStream.read(buffer);
-    assert numBytesRead == length;
-    return Ints.fromByteArray(buffer);
-  }
-
-  /**
-   * Helper method to decode long.
-   */
-  public static long decodeLong(DataInputStream dataInputStream)
-      throws IOException {
-    int length = Long.BYTES;
-    byte[] buffer = new byte[length];
-    int numBytesRead = dataInputStream.read(buffer);
-    assert numBytesRead == length;
-    return Longs.fromByteArray(buffer);
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
index a2976eb2c1..0b00b90902 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
@@ -53,7 +53,7 @@ public class StreamingInstanceResponseOperator extends 
InstanceResponseOperator
           instanceResponseDataTable.toDataOnlyDataTable()));
     } catch (IOException e) {
       // when exception occurs in streaming, we return an error-only metadata 
block.
-      metadataOnlyDataTable = DataTableBuilder.getEmptyDataTable();
+      metadataOnlyDataTable = DataTableFactory.getEmptyDataTable();
       
metadataOnlyDataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
     }
     // return a metadata-only block.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 13cd4151a8..967360453d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.BlockDocIdValueSet;
 import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.Table;
@@ -308,7 +309,7 @@ public class IntermediateResultsBlock implements Block {
 
   private DataTable getResultDataTable()
       throws IOException {
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
+    DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(_dataSchema);
     ColumnDataType[] storedColumnDataTypes = 
_dataSchema.getStoredColumnDataTypes();
     Iterator<Record> iterator = _table.iterator();
     while (iterator.hasNext()) {
@@ -391,7 +392,8 @@ public class IntermediateResultsBlock implements Block {
     }
 
     // Build the data table.
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(new 
DataSchema(columnNames, columnDataTypes));
+    DataTableBuilder dataTableBuilder =
+        DataTableFactory.getDataTableBuilder(new DataSchema(columnNames, 
columnDataTypes));
     dataTableBuilder.startRow();
     for (int i = 0; i < numAggregationFunctions; i++) {
       switch (columnDataTypes[i]) {
@@ -416,7 +418,7 @@ public class IntermediateResultsBlock implements Block {
   }
 
   private DataTable getMetadataDataTable() {
-    return attachMetadataToDataTable(DataTableBuilder.getEmptyDataTable());
+    return attachMetadataToDataTable(DataTableFactory.getEmptyDataTable());
   }
 
   private DataTable attachMetadataToDataTable(DataTable dataTable) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
index 371e1a0519..0fe04313cf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
@@ -239,7 +239,8 @@ public class DistinctTable {
   public byte[] toBytes()
       throws IOException {
     // NOTE: Serialize the DistinctTable as a DataTable
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
+        _dataSchema);
     ColumnDataType[] storedColumnDataTypes = 
_dataSchema.getStoredColumnDataTypes();
     int numColumns = storedColumnDataTypes.length;
     for (Record record : _records) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index e6eb711ab6..e0c086f5c3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -47,6 +47,7 @@ import org.apache.pinot.core.common.ExplainPlanRowData;
 import org.apache.pinot.core.common.ExplainPlanRows;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.common.datatable.DataTableUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
@@ -168,7 +169,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       String errorMessage =
           String.format("Query scheduling took %dms (longer than query timeout 
of %dms) on server: %s",
               querySchedulingTimeMs, queryTimeoutMs, 
_instanceDataManager.getInstanceId());
-      DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+      DataTable dataTable = DataTableFactory.getEmptyDataTable();
       
dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
 errorMessage));
       LOGGER.error("{} while processing requestId: {}", errorMessage, 
requestId);
       return dataTable;
@@ -178,7 +179,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     if (tableDataManager == null) {
       String errorMessage = String.format("Failed to find table: %s on server: 
%s", tableNameWithType,
           _instanceDataManager.getInstanceId());
-      DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+      DataTable dataTable = DataTableFactory.getEmptyDataTable();
       
dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
 errorMessage));
       LOGGER.error("{} while processing requestId: {}", errorMessage, 
requestId);
       return dataTable;
@@ -226,7 +227,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
         LOGGER.error("Exception processing requestId {}", requestId, e);
       }
 
-      dataTable = DataTableBuilder.getEmptyDataTable();
+      dataTable = DataTableFactory.getEmptyDataTable();
       
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
     } finally {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
@@ -334,7 +335,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
 
   /** @return EXPLAIN_PLAN query result {@link DataTable} when no segments get 
selected for query execution.*/
   private static DataTable getExplainPlanResultsForNoMatchingSegment(int 
totalNumSegments) {
-    DataTableBuilder dataTableBuilder = new 
DataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+    DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
     try {
       dataTableBuilder.startRow();
       dataTableBuilder.setColumn(0, 
String.format(ExplainPlanRows.PLAN_START_FORMAT,
@@ -440,7 +441,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
 
   /** @return EXPLAIN PLAN query result {@link DataTable}. */
   public static DataTable processExplainPlanQueries(Plan queryPlan) {
-    DataTableBuilder dataTableBuilder = new 
DataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+    DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
     List<Operator> childOperators = 
queryPlan.getPlanNode().run().getChildOperators();
     assert childOperators.size() == 1;
     Operator root = childOperators.get(0);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 3009a96019..7ca4485ca4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.TimerContext;
@@ -154,7 +154,7 @@ public abstract class QueryScheduler {
           queryRequest.getBrokerId(), e);
       // For not handled exceptions
       _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
-      dataTable = DataTableBuilder.getEmptyDataTable();
+      dataTable = DataTableFactory.getEmptyDataTable();
       
dataTable.addException(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
     }
     long requestId = queryRequest.getRequestId();
@@ -320,7 +320,7 @@ public abstract class QueryScheduler {
    */
   protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest 
queryRequest,
       ProcessingException error) {
-    DataTable result = DataTableBuilder.getEmptyDataTable();
+    DataTable result = DataTableFactory.getEmptyDataTable();
 
     Map<String, String> dataTableMetadata = result.getMetadata();
     dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(queryRequest.getRequestId()));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 1b7cac34cd..a4ffd2a7f8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
@@ -234,7 +235,8 @@ public class SelectionOperatorUtils {
     ColumnDataType[] storedColumnDataTypes = 
dataSchema.getStoredColumnDataTypes();
     int numColumns = storedColumnDataTypes.length;
 
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
+        dataSchema);
     for (Object[] row : rows) {
       dataTableBuilder.startRow();
       for (int i = 0; i < numColumns; i++) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index a93137698f..af841eea5b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -37,7 +37,7 @@ import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.server.access.AccessControl;
@@ -133,7 +133,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
       String hexString = requestBytes != null ? 
BytesUtils.toHexString(requestBytes) : "";
       long reqestId = instanceRequest != null ? instanceRequest.getRequestId() 
: 0;
       LOGGER.error("Exception while processing instance request: {}", 
hexString, e);
-      sendErrorResponse(ctx, reqestId, tableNameWithType, queryArrivalTimeMs, 
DataTableBuilder.getEmptyDataTable(), e);
+      sendErrorResponse(ctx, reqestId, tableNameWithType, queryArrivalTimeMs, 
DataTableFactory.getEmptyDataTable(), e);
     }
   }
 
@@ -148,7 +148,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
         } else {
           // Send exception response.
           sendErrorResponse(ctx, queryRequest.getRequestId(), 
tableNameWithType, queryArrivalTimeMs,
-              DataTableBuilder.getEmptyDataTable(), new Exception("Null query 
response."));
+              DataTableFactory.getEmptyDataTable(), new Exception("Null query 
response."));
         }
       }
 
@@ -157,7 +157,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
         // Send exception response.
         LOGGER.error("Exception while processing instance request", t);
         sendErrorResponse(ctx, instanceRequest.getRequestId(), 
tableNameWithType, queryArrivalTimeMs,
-            DataTableBuilder.getEmptyDataTable(), new Exception(t));
+            DataTableFactory.getEmptyDataTable(), new Exception(t));
       }
     };
   }
@@ -168,7 +168,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
     // will only be called if for some remote reason we are unable to handle 
exceptions in channelRead0.
     String message = "Unhandled Exception in " + getClass().getCanonicalName();
     LOGGER.error(message, cause);
-    sendErrorResponse(ctx, 0, null, System.currentTimeMillis(), 
DataTableBuilder.getEmptyDataTable(),
+    sendErrorResponse(ctx, 0, null, System.currentTimeMillis(), 
DataTableFactory.getEmptyDataTable(),
         new Exception(message, cause));
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index d7ec9a7bfa..b9e816374c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -81,7 +81,7 @@ public class DataBlockTest {
     DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
         columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
     List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, 
TEST_ROW_COUNT);
-    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     DataTable dataTableImpl = 
SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema);
     DataTable dataBlockFromDataTable = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index fbe9d98ed9..4bc037848c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -106,7 +106,7 @@ public class DataTableSerDeTest {
         QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, 
exception);
     String expected = processingException.getMessage();
 
-    DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+    DataTable dataTable = DataTableFactory.getEmptyDataTable();
     dataTable.addException(processingException);
     DataTable newDataTable = 
DataTableFactory.getDataTable(dataTable.toBytes());
     Assert.assertNull(newDataTable.getDataSchema());
@@ -124,7 +124,7 @@ public class DataTableSerDeTest {
 
     DataSchema dataSchema = new DataSchema(new String[]{"SV", "MV"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.STRING_ARRAY});
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(dataSchema);
     for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
       dataTableBuilder.startRow();
       dataTableBuilder.setColumn(0, emptyString);
@@ -154,7 +154,7 @@ public class DataTableSerDeTest {
     }
 
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
 
     DataTable dataTable = dataTableBuilder.build();
@@ -181,8 +181,8 @@ public class DataTableSerDeTest {
 
     // Verify V4 broker can deserialize data table (has data, but has no 
metadata) send by V3 server
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
-    DataTableBuilder dataTableBuilderV3WithDataOnly = new 
DataTableBuilder(dataSchema);
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilder dataTableBuilderV3WithDataOnly = 
DataTableFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, 
columnDataTypes, numColumns);
 
     DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create 
a V3 data table
@@ -204,7 +204,7 @@ public class DataTableSerDeTest {
     Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
 
     // Verify V4 broker can deserialize data table (only has metadata) send by 
V3 server
-    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new 
DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = 
DataTableFactory.getDataTableBuilder(dataSchema);
     dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a 
V3 data table
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -217,8 +217,8 @@ public class DataTableSerDeTest {
     // Verify V4 broker can deserialize (has data, but has no metadata) send 
by V4 server(with ThreadCpuTimeMeasurement
     // disabled)
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
-    DataTableBuilder dataTableBuilderV4WithDataOnly = new 
DataTableBuilder(dataSchema);
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+    DataTableBuilder dataTableBuilderV4WithDataOnly = 
DataTableFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, 
columnDataTypes, numColumns);
     DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create 
a V4 data table
     // Deserialize data table bytes as V4
@@ -242,7 +242,7 @@ public class DataTableSerDeTest {
 
     // Verify V4 broker can deserialize data table (only has metadata) send by 
V4 server(with
     // ThreadCpuTimeMeasurement disabled)
-    DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = new 
DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = 
DataTableFactory.getDataTableBuilder(dataSchema);
     dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a 
V4 data table
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -255,7 +255,7 @@ public class DataTableSerDeTest {
     // Verify V4 broker can deserialize (has data, but has no metadata) send 
by V4 server(with ThreadCpuTimeMeasurement
     // enabled)
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
-    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data 
table
     // Deserialize data table bytes as V4
     newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
@@ -311,8 +311,8 @@ public class DataTableSerDeTest {
 
     // Verify V3 broker can deserialize data table (has data, but has no 
metadata) send by V2 server
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_2);
-    DataTableBuilder dataTableBuilderV2WithDataOnly = new 
DataTableBuilder(dataSchema);
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_2);
+    DataTableBuilder dataTableBuilderV2WithDataOnly = 
DataTableFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly, 
columnDataTypes, numColumns);
 
     DataTable dataTableV2 = dataTableBuilderV2WithDataOnly.build(); // create 
a V2 data table
@@ -334,7 +334,7 @@ public class DataTableSerDeTest {
     Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
 
     // Verify V3 broker can deserialize data table (only has metadata) send by 
V2 server
-    DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = new 
DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = 
DataTableFactory.getDataTableBuilder(dataSchema);
     dataTableV2 = dataTableBuilderV2WithMetadataDataOnly.build(); // create a 
V2 data table
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -347,8 +347,8 @@ public class DataTableSerDeTest {
     // Verify V3 broker can deserialize (has data, but has no metadata) send 
by V3 server(with ThreadCpuTimeMeasurement
     // disabled)
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
-    DataTableBuilder dataTableBuilderV3WithDataOnly = new 
DataTableBuilder(dataSchema);
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilder dataTableBuilderV3WithDataOnly = 
DataTableFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, 
columnDataTypes, numColumns);
     DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create 
a V3 data table
     // Deserialize data table bytes as V3
@@ -372,7 +372,7 @@ public class DataTableSerDeTest {
 
     // Verify V3 broker can deserialize data table (only has metadata) send by 
V3 server(with
     // ThreadCpuTimeMeasurement disabled)
-    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new 
DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = 
DataTableFactory.getDataTableBuilder(dataSchema);
     dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a 
V3 data table
     for (String key : EXPECTED_METADATA.keySet()) {
       dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -385,7 +385,7 @@ public class DataTableSerDeTest {
     // Verify V3 broker can deserialize (has data, but has no metadata) send 
by V3 server(with ThreadCpuTimeMeasurement
     // enabled)
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
-    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
     dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data 
table
     // Deserialize data table bytes as V3
     newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
@@ -438,7 +438,7 @@ public class DataTableSerDeTest {
     }
 
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
 
     DataTable dataTable = dataTableBuilder.build();
@@ -474,8 +474,8 @@ public class DataTableSerDeTest {
 
     ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+    DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
 
     DataTable dataTable = dataTableBuilder.build();
@@ -486,7 +486,7 @@ public class DataTableSerDeTest {
 
     ByteBuffer byteBuffer = ByteBuffer.wrap(dataTable.toBytes());
     int version = byteBuffer.getInt();
-    Assert.assertEquals(version, DataTableBuilder.VERSION_3);
+    Assert.assertEquals(version, DataTableFactory.VERSION_3);
     byteBuffer.getInt(); // numOfRows
     byteBuffer.getInt(); // numOfColumns
     byteBuffer.getInt(); // exceptionsStart
@@ -542,7 +542,7 @@ public class DataTableSerDeTest {
       DataSchema.ColumnDataType[] columnDataTypes, int numColumns)
       throws IOException {
     RoaringBitmap[] nullBitmaps = null;
-    if (DataTableBuilder.getCurrentDataTableVersion() >= 
DataTableBuilder.VERSION_4) {
+    if (DataTableFactory.getDataTableVersion() >= DataTableFactory.VERSION_4) {
       nullBitmaps = new RoaringBitmap[numColumns];
       for (int colId = 0; colId < numColumns; colId++) {
         nullBitmaps[colId] = new RoaringBitmap();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
index 8f2be65532..2db3a4cc01 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -53,7 +54,7 @@ public class BrokerReduceServiceTest {
         CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM 
testTable GROUP BY col1");
     DataSchema dataSchema =
         new DataSchema(new String[]{"col1", "count(*)"}, new 
ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+    DataTableBuilder dataTableBuilder = 
DataTableFactory.getDataTableBuilder(dataSchema);
     int numGroups = 5000;
     for (int i = 0; i < numGroups; i++) {
       dataTableBuilder.startRow();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index bd2a1fb164..927b9cc7f6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -43,7 +43,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.executor.QueryExecutor;
@@ -306,7 +305,7 @@ public class PrioritySchedulerTest {
           throw new RuntimeException(e);
         }
       }
-      DataTable result = DataTableBuilder.getEmptyDataTable();
+      DataTable result = DataTableFactory.getEmptyDataTable();
       result.getMetadata().put(MetadataKey.TABLE.getName(), 
queryRequest.getTableNameWithType());
       if (_useBarrier) {
         try {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 7639e50d30..01191ce79d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
@@ -81,7 +81,7 @@ public class QueryRoutingTest {
   public void testValidResponse()
       throws Exception {
     long requestId = 123;
-    DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+    DataTable dataTable = DataTableFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
@@ -159,7 +159,7 @@ public class QueryRoutingTest {
   public void testNonMatchingRequestId()
       throws Exception {
     long requestId = 123;
-    DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+    DataTable dataTable = DataTableFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
@@ -192,7 +192,7 @@ public class QueryRoutingTest {
     // To avoid flakyness, set timeoutMs to 2000 msec. For some test runs, it 
can take up to
     // 1400 msec to mark request as failed.
     long timeoutMs = 2000L;
-    DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+    DataTable dataTable = DataTableFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 8bbf28e1ee..5d149de4ed 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -24,7 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -82,7 +82,7 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTest
     waitForAllDocsLoaded(600_000L);
 
     // Setting data table version to 4
-    DataTableBuilder.setCurrentDataTableVersion(4);
+    DataTableFactory.setDataTableVersion(4);
   }
 
   @Override
@@ -125,7 +125,7 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTest
   public void tearDown()
       throws Exception {
     // Setting data table version to 4
-    DataTableBuilder.setCurrentDataTableVersion(3);
+    DataTableFactory.setDataTableVersion(3);
 
     dropOfflineTable(DEFAULT_TABLE_NAME);
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index f76917b766..4ef6b763f0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryEnvironmentTestUtils;
@@ -67,6 +68,7 @@ public class QueryRunnerTest {
   @BeforeClass
   public void setUp()
       throws Exception {
+    DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     QueryServerEnclosure server1 = new 
QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
         ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", 
INDEX_DIR_S1_C),
         QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
@@ -95,6 +97,7 @@ public class QueryRunnerTest {
 
   @AfterClass
   public void tearDown() {
+    DataTableFactory.setDataTableVersion(DataTableFactory.DEFAULT_VERSION);
     for (QueryServerEnclosure server : _servers.values()) {
       server.shutDown();
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 4fa01c9110..8f00c56bfb 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -61,7 +61,7 @@ import org.apache.pinot.common.utils.TlsUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
 import org.apache.pinot.core.query.request.context.ThreadTimer;
@@ -186,7 +186,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       throw new UnsupportedOperationException("Setting experimental DataTable 
version newer than default via config "
           + "is not allowed. Current default DataTable version: " + 
Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
     }
-    DataTableBuilder.setCurrentDataTableVersion(dataTableVersion);
+    DataTableFactory.setDataTableVersion(dataTableVersion);
 
     LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName: 
{}, instanceId: {}", _zkAddress,
         _helixClusterName, _instanceId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to