This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dacc6d0690 Cleanup old DataTable v2 and v3 (#13590)
dacc6d0690 is described below

commit dacc6d06907c44e83721454f1090e5f00c824f15
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Jul 11 23:07:49 2024 -0700

    Cleanup old DataTable v2 and v3 (#13590)
---
 .../pinot/common/datablock/BaseDataBlock.java      |   4 +-
 .../pinot/common/datatable/DataTableFactory.java   |  15 +-
 .../pinot/common/datatable/DataTableImplV2.java    | 272 --------------
 .../pinot/common/datatable/DataTableImplV3.java    | 403 --------------------
 .../pinot/common/datatable/DataTableImplV4.java    |  39 +-
 .../pinot/common/datatable/DataTableUtils.java     |  18 +-
 .../common/datatable/DataTableBuilderFactory.java  |  39 +-
 .../common/datatable/DataTableBuilderV2V3.java     | 121 ------
 .../core/query/request/ServerQueryRequest.java     |  10 +-
 .../core/common/datatable/DataTableSerDeTest.java  | 405 +--------------------
 .../apache/pinot/queries/AllNullQueriesTest.java   |   2 -
 .../pinot/queries/BigDecimalQueriesTest.java       |   2 -
 .../queries/BooleanNullEnabledQueriesTest.java     |   2 -
 .../pinot/queries/NullEnabledQueriesTest.java      |   2 -
 .../tests/NullHandlingIntegrationTest.java         |   2 -
 .../tests/OfflineClusterIntegrationTest.java       |   3 -
 .../query/runtime/queries/QueryRunnerTest.java     |   2 -
 .../runtime/queries/ResourceBasedQueriesTest.java  |   2 -
 18 files changed, 59 insertions(+), 1284 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
index 333ac673b6..9b643b088d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
@@ -28,7 +28,7 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.apache.pinot.common.CustomObject;
-import org.apache.pinot.common.datatable.DataTableImplV3;
+import org.apache.pinot.common.datatable.DataTableImplV4;
 import org.apache.pinot.common.datatable.DataTableUtils;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
@@ -43,7 +43,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 /**
- * Base data block mostly replicating implementation of {@link 
DataTableImplV3}.
+ * Base data block mostly replicating implementation of {@link 
DataTableImplV4}.
  *
  * +-----------------------------------------------+
  * | 13 integers of header:                        |
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
index 55374d7cca..ec7762c605 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.datatable;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -26,23 +27,13 @@ public class DataTableFactory {
   private DataTableFactory() {
   }
 
-  public static final int VERSION_2 = 2;
-  public static final int VERSION_3 = 3;
   public static final int VERSION_4 = 4;
 
   public static DataTable getDataTable(ByteBuffer byteBuffer)
       throws IOException {
     int version = byteBuffer.getInt();
-    switch (version) {
-      case VERSION_2:
-        return new DataTableImplV2(byteBuffer);
-      case VERSION_3:
-        return new DataTableImplV3(byteBuffer);
-      case VERSION_4:
-        return new DataTableImplV4(byteBuffer);
-      default:
-        throw new IllegalStateException("Unsupported data table version: " + 
version);
-    }
+    Preconditions.checkState(version == VERSION_4, "Unsupported data table 
version: %s", version);
+    return new DataTableImplV4(byteBuffer);
   }
 
   public static DataTable getDataTable(byte[] bytes)
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
deleted file mode 100644
index 261e1edc46..0000000000
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.datatable;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.HashUtil;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-
-public class DataTableImplV2 extends BaseDataTable {
-  // VERSION
-  // NUM_ROWS
-  // NUM_COLUMNS
-  // DICTIONARY_MAP (START|SIZE)
-  // METADATA (START|SIZE)
-  // DATA_SCHEMA (START|SIZE)
-  // FIXED_SIZE_DATA (START|SIZE)
-  // VARIABLE_SIZE_DATA (START|SIZE)
-  private static final int HEADER_SIZE = Integer.BYTES * 13;
-
-  /**
-   * Construct data table with results. (Server side)
-   */
-  public DataTableImplV2(int numRows, DataSchema dataSchema, Map<String, 
Map<Integer, String>> dictionaryMap,
-      byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
-    super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, 
variableSizeDataBytes);
-  }
-
-  /**
-   * Construct empty data table. (Server side)
-   */
-  public DataTableImplV2() {
-  }
-
-  /**
-   * Construct data table from byte array. (broker side)
-   */
-  public DataTableImplV2(ByteBuffer byteBuffer)
-      throws IOException {
-    // Read header.
-    _numRows = byteBuffer.getInt();
-    _numColumns = byteBuffer.getInt();
-    int dictionaryMapStart = byteBuffer.getInt();
-    int dictionaryMapLength = byteBuffer.getInt();
-    int metadataStart = byteBuffer.getInt();
-    int metadataLength = byteBuffer.getInt();
-    int dataSchemaStart = byteBuffer.getInt();
-    int dataSchemaLength = byteBuffer.getInt();
-    int fixedSizeDataStart = byteBuffer.getInt();
-    int fixedSizeDataLength = byteBuffer.getInt();
-    int variableSizeDataStart = byteBuffer.getInt();
-    int variableSizeDataLength = byteBuffer.getInt();
-
-    // Read dictionary.
-    if (dictionaryMapLength != 0) {
-      byteBuffer.position(dictionaryMapStart);
-      _dictionaryMap = deserializeDictionaryMap(byteBuffer);
-    } else {
-      _dictionaryMap = null;
-    }
-
-    // Read metadata.
-    byteBuffer.position(metadataStart);
-    _metadata = deserializeMetadata(byteBuffer);
-
-    // Read data schema.
-    if (dataSchemaLength != 0) {
-      byteBuffer.position(dataSchemaStart);
-      _dataSchema = DataSchema.fromBytes(byteBuffer);
-      _columnOffsets = new int[_dataSchema.size()];
-      _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, 
_columnOffsets, getVersion());
-    } else {
-      _dataSchema = null;
-      _columnOffsets = null;
-      _rowSizeInBytes = 0;
-    }
-
-    // Read fixed size data.
-    if (fixedSizeDataLength != 0) {
-      _fixedSizeDataBytes = new byte[fixedSizeDataLength];
-      byteBuffer.position(fixedSizeDataStart);
-      byteBuffer.get(_fixedSizeDataBytes);
-      _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes);
-    } else {
-      _fixedSizeDataBytes = null;
-      _fixedSizeData = null;
-    }
-
-    // Read variable size data.
-    if (variableSizeDataLength != 0) {
-      _variableSizeDataBytes = new byte[variableSizeDataLength];
-      byteBuffer.position(variableSizeDataStart);
-      byteBuffer.get(_variableSizeDataBytes);
-      _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
-    } else {
-      _variableSizeDataBytes = null;
-      _variableSizeData = null;
-    }
-  }
-
-  @Override
-  public int getVersion() {
-    return DataTableFactory.VERSION_2;
-  }
-
-  private Map<String, String> deserializeMetadata(ByteBuffer buffer)
-      throws IOException {
-    int numEntries = buffer.getInt();
-    Map<String, String> metadata = new 
HashMap<>(HashUtil.getHashMapCapacity(numEntries));
-
-    for (int i = 0; i < numEntries; i++) {
-      String key = DataTableUtils.decodeString(buffer);
-      String value = DataTableUtils.decodeString(buffer);
-      metadata.put(key, value);
-    }
-
-    return metadata;
-  }
-
-  @Override
-  public void addException(ProcessingException processingException) {
-    _metadata.put(EXCEPTION_METADATA_KEY + processingException.getErrorCode(), 
processingException.getMessage());
-  }
-
-  @Override
-  public void addException(int errCode, String errMsg) {
-    _metadata.put(EXCEPTION_METADATA_KEY + errCode, errMsg);
-  }
-
-  // getExceptions return a map of errorCode->errMessage of the datatable.
-  @Override
-  public Map<Integer, String> getExceptions() {
-    Map<Integer, String> exceptions = new HashMap<>();
-    for (String key : _metadata.keySet()) {
-      if (key.startsWith(EXCEPTION_METADATA_KEY)) {
-        // In V2, all exceptions are added into metadata, using 
"Exception"+errCode as key,
-        // Integer.parseInt(key.substring(9)) can extract the error code from 
the key.
-        exceptions.put(Integer.parseInt(key.substring(9)), _metadata.get(key));
-      }
-    }
-    return exceptions;
-  }
-
-  @Override
-  public byte[] toBytes()
-      throws IOException {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    dataOutputStream.writeInt(DataTableFactory.VERSION_2);
-    dataOutputStream.writeInt(_numRows);
-    dataOutputStream.writeInt(_numColumns);
-    int dataOffset = HEADER_SIZE;
-
-    // Write dictionary.
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dictionaryMapBytes = null;
-    if (_dictionaryMap != null) {
-      dictionaryMapBytes = serializeDictionaryMap();
-      dataOutputStream.writeInt(dictionaryMapBytes.length);
-      dataOffset += dictionaryMapBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write metadata.
-    dataOutputStream.writeInt(dataOffset);
-    byte[] metadataBytes = serializeMetadata();
-    dataOutputStream.writeInt(metadataBytes.length);
-    dataOffset += metadataBytes.length;
-
-    // Write data schema.
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dataSchemaBytes = null;
-    if (_dataSchema != null) {
-      dataSchemaBytes = _dataSchema.toBytes();
-      dataOutputStream.writeInt(dataSchemaBytes.length);
-      dataOffset += dataSchemaBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write fixed size data.
-    dataOutputStream.writeInt(dataOffset);
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.writeInt(_fixedSizeDataBytes.length);
-      dataOffset += _fixedSizeDataBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write variable size data.
-    dataOutputStream.writeInt(dataOffset);
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.writeInt(_variableSizeDataBytes.length);
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write actual data.
-    if (dictionaryMapBytes != null) {
-      dataOutputStream.write(dictionaryMapBytes);
-    }
-    dataOutputStream.write(metadataBytes);
-    if (dataSchemaBytes != null) {
-      dataOutputStream.write(dataSchemaBytes);
-    }
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.write(_fixedSizeDataBytes);
-    }
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.write(_variableSizeDataBytes);
-    }
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  @Override
-  public DataTableImplV2 toMetadataOnlyDataTable() {
-    DataTableImplV2 metadataOnlyDataTable = new DataTableImplV2();
-    metadataOnlyDataTable._metadata.putAll(_metadata);
-    return metadataOnlyDataTable;
-  }
-
-  @Override
-  public DataTableImplV2 toDataOnlyDataTable() {
-    return new DataTableImplV2(_numRows, _dataSchema, _dictionaryMap, 
_fixedSizeDataBytes, _variableSizeDataBytes);
-  }
-
-  private byte[] serializeMetadata()
-      throws IOException {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-
-    dataOutputStream.writeInt(_metadata.size());
-    for (Entry<String, String> entry : _metadata.entrySet()) {
-      byte[] keyBytes = entry.getKey().getBytes(UTF_8);
-      dataOutputStream.writeInt(keyBytes.length);
-      dataOutputStream.write(keyBytes);
-
-      byte[] valueBytes = entry.getValue().getBytes(UTF_8);
-      dataOutputStream.writeInt(valueBytes.length);
-      dataOutputStream.write(valueBytes);
-    }
-
-    return byteArrayOutputStream.toByteArray();
-  }
-}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
deleted file mode 100644
index af4b7eff46..0000000000
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.common.datatable;
-
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-
-/**
- * Datatable V3 implementation.
- * The layout of serialized V3 datatable looks like:
- * +-----------------------------------------------+
- * | 13 integers of header:                        |
- * | VERSION                                       |
- * | NUM_ROWS                                      |
- * | NUM_COLUMNS                                   |
- * | EXCEPTIONS SECTION START OFFSET               |
- * | EXCEPTIONS SECTION LENGTH                     |
- * | DICTIONARY_MAP SECTION START OFFSET           |
- * | DICTIONARY_MAP SECTION LENGTH                 |
- * | DATA_SCHEMA SECTION START OFFSET              |
- * | DATA_SCHEMA SECTION LENGTH                    |
- * | FIXED_SIZE_DATA SECTION START OFFSET          |
- * | FIXED_SIZE_DATA SECTION LENGTH                |
- * | VARIABLE_SIZE_DATA SECTION START OFFSET       |
- * | VARIABLE_SIZE_DATA SECTION LENGTH             |
- * +-----------------------------------------------+
- * | EXCEPTIONS SECTION                            |
- * +-----------------------------------------------+
- * | DICTIONARY_MAP SECTION                        |
- * +-----------------------------------------------+
- * | DATA_SCHEMA SECTION                           |
- * +-----------------------------------------------+
- * | FIXED_SIZE_DATA SECTION                       |
- * +-----------------------------------------------+
- * | VARIABLE_SIZE_DATA SECTION                    |
- * +-----------------------------------------------+
- * | METADATA LENGTH                               |
- * | METADATA SECTION                              |
- * +-----------------------------------------------+
- */
-public class DataTableImplV3 extends BaseDataTable {
-  private static final int HEADER_SIZE = Integer.BYTES * 13;
-  // _errCodeToExceptionMap stores exceptions as a map of 
errorCode->errorMessage
-  private final Map<Integer, String> _errCodeToExceptionMap;
-
-  /**
-   * Construct data table with results. (Server side)
-   */
-  public DataTableImplV3(int numRows, DataSchema dataSchema, Map<String, 
Map<Integer, String>> dictionaryMap,
-      byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
-    super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, 
variableSizeDataBytes);
-    _errCodeToExceptionMap = new HashMap<>();
-  }
-
-  /**
-   * Construct empty data table. (Server side)
-   */
-  public DataTableImplV3() {
-    _errCodeToExceptionMap = new HashMap<>();
-  }
-
-  /**
-   * Construct data table from byte array. (broker side)
-   */
-  public DataTableImplV3(ByteBuffer byteBuffer)
-      throws IOException {
-    // Read header.
-    _numRows = byteBuffer.getInt();
-    _numColumns = byteBuffer.getInt();
-    int exceptionsStart = byteBuffer.getInt();
-    int exceptionsLength = byteBuffer.getInt();
-    int dictionaryMapStart = byteBuffer.getInt();
-    int dictionaryMapLength = byteBuffer.getInt();
-    int dataSchemaStart = byteBuffer.getInt();
-    int dataSchemaLength = byteBuffer.getInt();
-    int fixedSizeDataStart = byteBuffer.getInt();
-    int fixedSizeDataLength = byteBuffer.getInt();
-    int variableSizeDataStart = byteBuffer.getInt();
-    int variableSizeDataLength = byteBuffer.getInt();
-
-    // Read exceptions.
-    if (exceptionsLength != 0) {
-      byteBuffer.position(exceptionsStart);
-      _errCodeToExceptionMap = deserializeExceptions(byteBuffer);
-    } else {
-      _errCodeToExceptionMap = new HashMap<>();
-    }
-
-    // Read dictionary.
-    if (dictionaryMapLength != 0) {
-      byteBuffer.position(dictionaryMapStart);
-      _dictionaryMap = deserializeDictionaryMap(byteBuffer);
-    } else {
-      _dictionaryMap = null;
-    }
-
-    // Read data schema.
-    if (dataSchemaLength != 0) {
-      byteBuffer.position(dataSchemaStart);
-      _dataSchema = DataSchema.fromBytes(byteBuffer);
-      _columnOffsets = new int[_dataSchema.size()];
-      _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, 
_columnOffsets, getVersion());
-    } else {
-      _dataSchema = null;
-      _columnOffsets = null;
-      _rowSizeInBytes = 0;
-    }
-
-    // Read fixed size data.
-    if (fixedSizeDataLength != 0) {
-      _fixedSizeDataBytes = new byte[fixedSizeDataLength];
-      byteBuffer.position(fixedSizeDataStart);
-      byteBuffer.get(_fixedSizeDataBytes);
-      _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes);
-    } else {
-      _fixedSizeDataBytes = null;
-      _fixedSizeData = null;
-    }
-
-    // Read variable size data.
-    _variableSizeDataBytes = new byte[variableSizeDataLength];
-    if (variableSizeDataLength != 0) {
-      byteBuffer.position(variableSizeDataStart);
-      byteBuffer.get(_variableSizeDataBytes);
-    }
-    _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
-
-    // Read metadata.
-    int metadataLength = byteBuffer.getInt();
-    if (metadataLength != 0) {
-      _metadata = deserializeMetadata(byteBuffer);
-    }
-  }
-
-  @Override
-  public int getVersion() {
-    return DataTableFactory.VERSION_3;
-  }
-
-  @Override
-  public void addException(ProcessingException processingException) {
-    _errCodeToExceptionMap.put(processingException.getErrorCode(), 
processingException.getMessage());
-  }
-
-  @Override
-  public void addException(int errCode, String errMsg) {
-    _errCodeToExceptionMap.put(errCode, errMsg);
-  }
-
-  @Override
-  public Map<Integer, String> getExceptions() {
-    return _errCodeToExceptionMap;
-  }
-
-  @Override
-  public byte[] toBytes()
-      throws IOException {
-    ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
-
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    writeLeadingSections(dataOutputStream);
-
-    // Add table serialization time metadata if thread timer is enabled.
-    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
-      long responseSerializationCpuTimeNs = 
threadResourceUsageProvider.getThreadTimeNs();
-      getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), 
String.valueOf(responseSerializationCpuTimeNs));
-    }
-
-    // Write metadata: length followed by actual metadata bytes.
-    // NOTE: We ignore metadata serialization time in 
"responseSerializationCpuTimeNs" as it's negligible while
-    // considering it will bring a lot code complexity.
-    byte[] metadataBytes = serializeMetadata();
-    dataOutputStream.writeInt(metadataBytes.length);
-    dataOutputStream.write(metadataBytes);
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  @Override
-  public DataTableImplV3 toMetadataOnlyDataTable() {
-    DataTableImplV3 metadataOnlyDataTable = new DataTableImplV3();
-    metadataOnlyDataTable._metadata.putAll(_metadata);
-    
metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap);
-    return metadataOnlyDataTable;
-  }
-
-  @Override
-  public DataTableImplV3 toDataOnlyDataTable() {
-    return new DataTableImplV3(_numRows, _dataSchema, _dictionaryMap, 
_fixedSizeDataBytes, _variableSizeDataBytes);
-  }
-
-  private void writeLeadingSections(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(DataTableFactory.VERSION_3);
-    dataOutputStream.writeInt(_numRows);
-    dataOutputStream.writeInt(_numColumns);
-    int dataOffset = HEADER_SIZE;
-
-    // Write exceptions section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] exceptionsBytes;
-    exceptionsBytes = serializeExceptions();
-    dataOutputStream.writeInt(exceptionsBytes.length);
-    dataOffset += exceptionsBytes.length;
-
-    // Write dictionary map section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dictionaryMapBytes = null;
-    if (_dictionaryMap != null) {
-      dictionaryMapBytes = serializeDictionaryMap();
-      dataOutputStream.writeInt(dictionaryMapBytes.length);
-      dataOffset += dictionaryMapBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write data schema section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dataSchemaBytes = null;
-    if (_dataSchema != null) {
-      dataSchemaBytes = _dataSchema.toBytes();
-      dataOutputStream.writeInt(dataSchemaBytes.length);
-      dataOffset += dataSchemaBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write fixed size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.writeInt(_fixedSizeDataBytes.length);
-      dataOffset += _fixedSizeDataBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write variable size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.writeInt(_variableSizeDataBytes.length);
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write actual data.
-    // Write exceptions bytes.
-    dataOutputStream.write(exceptionsBytes);
-    // Write dictionary map bytes.
-    if (dictionaryMapBytes != null) {
-      dataOutputStream.write(dictionaryMapBytes);
-    }
-    // Write data schema bytes.
-    if (dataSchemaBytes != null) {
-      dataOutputStream.write(dataSchemaBytes);
-    }
-    // Write fixed size data bytes.
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.write(_fixedSizeDataBytes);
-    }
-    // Write variable size data bytes.
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.write(_variableSizeDataBytes);
-    }
-  }
-
-  /**
-   * Serialize metadata section to bytes.
-   * Format of the bytes looks like:
-   * [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3]
-   * For each KV pair:
-   * - if the value type is String, encode it as: [enumKeyOrdinal, 
valueLength, Utf8EncodedValue].
-   * - if the value type is int, encode it as: [enumKeyOrdinal, 
bigEndianRepresentationOfIntValue]
-   * - if the value type is long, encode it as: [enumKeyOrdinal, 
bigEndianRepresentationOfLongValue]
-   *
-   * Unlike V2, where numeric metadata values (int and long) in V3 are encoded 
in UTF-8 in the wire format,
-   * in V3 big endian representation is used.
-   */
-  private byte[] serializeMetadata()
-      throws IOException {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-
-    dataOutputStream.writeInt(_metadata.size());
-
-    for (Map.Entry<String, String> entry : _metadata.entrySet()) {
-      MetadataKey key = MetadataKey.getByName(entry.getKey());
-      // Ignore unknown keys.
-      if (key == null) {
-        continue;
-      }
-      String value = entry.getValue();
-      dataOutputStream.writeInt(key.getId());
-      if (key.getValueType() == MetadataValueType.INT) {
-        dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value)));
-      } else if (key.getValueType() == MetadataValueType.LONG) {
-        dataOutputStream.write(Longs.toByteArray(Long.parseLong(value)));
-      } else {
-        byte[] valueBytes = value.getBytes(UTF_8);
-        dataOutputStream.writeInt(valueBytes.length);
-        dataOutputStream.write(valueBytes);
-      }
-    }
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  /**
-   * Even though the wire format of V3 uses UTF-8 for string/bytes and 
big-endian for numeric values,
-   * the in-memory representation is STRING based for processing the metadata 
before serialization
-   * (by the server as it adds the statistics in metadata) and after 
deserialization (by the broker as it receives
-   * DataTable from each server and aggregates the values).
-   * This is to make V3 implementation keep the consumers of Map<String, 
String> getMetadata() API in the code happy
-   * by internally converting it.
-   *
-   * This method use relative operations on the ByteBuffer and expects the 
buffer's position to be set correctly.
-   */
-  private Map<String, String> deserializeMetadata(ByteBuffer buffer)
-      throws IOException {
-    int numEntries = buffer.getInt();
-    Map<String, String> metadata = new HashMap<>();
-    for (int i = 0; i < numEntries; i++) {
-      int keyId = buffer.getInt();
-      MetadataKey key = MetadataKey.getById(keyId);
-      // Ignore unknown keys.
-      if (key == null) {
-        continue;
-      }
-      if (key.getValueType() == MetadataValueType.INT) {
-        String value = "" + buffer.getInt();
-        metadata.put(key.getName(), value);
-      } else if (key.getValueType() == MetadataValueType.LONG) {
-        String value = "" + buffer.getLong();
-        metadata.put(key.getName(), value);
-      } else {
-        String value = DataTableUtils.decodeString(buffer);
-        metadata.put(key.getName(), value);
-      }
-    }
-    return metadata;
-  }
-
-  private byte[] serializeExceptions()
-      throws IOException {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-
-    dataOutputStream.writeInt(_errCodeToExceptionMap.size());
-
-    for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) 
{
-      int key = entry.getKey();
-      String value = entry.getValue();
-      byte[] valueBytes = value.getBytes(UTF_8);
-      dataOutputStream.writeInt(key);
-      dataOutputStream.writeInt(valueBytes.length);
-      dataOutputStream.write(valueBytes);
-    }
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
-      throws IOException {
-    int numExceptions = buffer.getInt();
-    Map<Integer, String> exceptions = new 
HashMap<>(HashUtil.getHashMapCapacity(numExceptions));
-    for (int i = 0; i < numExceptions; i++) {
-      int errCode = buffer.getInt();
-      String errMessage = DataTableUtils.decodeString(buffer);
-      exceptions.put(errCode, errMessage);
-    }
-    return exceptions;
-  }
-}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index b58669fd4b..a39a7d7e9f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -36,7 +36,6 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
-import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -46,9 +45,39 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 /**
- * Datatable V4 Implementation is a wrapper around the Row-based data block.
+ * Datatable V4 implementation.
+ *
+ * The layout of serialized V4 datatable looks like:
+ * +-----------------------------------------------+
+ * | 13 integers of header:                        |
+ * | VERSION                                       |
+ * | NUM_ROWS                                      |
+ * | NUM_COLUMNS                                   |
+ * | EXCEPTIONS SECTION START OFFSET               |
+ * | EXCEPTIONS SECTION LENGTH                     |
+ * | DICTIONARY_MAP SECTION START OFFSET           |
+ * | DICTIONARY_MAP SECTION LENGTH                 |
+ * | DATA_SCHEMA SECTION START OFFSET              |
+ * | DATA_SCHEMA SECTION LENGTH                    |
+ * | FIXED_SIZE_DATA SECTION START OFFSET          |
+ * | FIXED_SIZE_DATA SECTION LENGTH                |
+ * | VARIABLE_SIZE_DATA SECTION START OFFSET       |
+ * | VARIABLE_SIZE_DATA SECTION LENGTH             |
+ * +-----------------------------------------------+
+ * | EXCEPTIONS SECTION                            |
+ * +-----------------------------------------------+
+ * | DICTIONARY_MAP SECTION                        |
+ * +-----------------------------------------------+
+ * | DATA_SCHEMA SECTION                           |
+ * +-----------------------------------------------+
+ * | FIXED_SIZE_DATA SECTION                       |
+ * +-----------------------------------------------+
+ * | VARIABLE_SIZE_DATA SECTION                    |
+ * +-----------------------------------------------+
+ * | METADATA LENGTH                               |
+ * | METADATA SECTION                              |
+ * +-----------------------------------------------+
  */
-@InterfaceStability.Evolving
 public class DataTableImplV4 implements DataTable {
 
   protected static final int HEADER_SIZE = Integer.BYTES * 13;
@@ -84,8 +113,8 @@ public class DataTableImplV4 implements DataTable {
     _errCodeToExceptionMap = new HashMap<>();
   }
 
-  public DataTableImplV4(int numRows, DataSchema dataSchema, String[] 
stringDictionary,
-      byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+  public DataTableImplV4(int numRows, DataSchema dataSchema, String[] 
stringDictionary, byte[] fixedSizeDataBytes,
+      byte[] variableSizeDataBytes) {
     _numRows = numRows;
     _dataSchema = dataSchema;
     _numColumns = dataSchema == null ? 0 : dataSchema.size();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
index a1ddd8d104..dc3b40bfd8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java
@@ -39,6 +39,7 @@ public class DataTableUtils {
    * @return row size in bytes.
    */
   public static int computeColumnOffsets(DataSchema dataSchema, int[] 
columnOffsets, int dataTableVersion) {
+    assert dataTableVersion == DataTableFactory.VERSION_4;
     int numColumns = columnOffsets.length;
     assert numColumns == dataSchema.size();
 
@@ -48,26 +49,13 @@ public class DataTableUtils {
       columnOffsets[i] = rowSizeInBytes;
       switch (storedColumnDataTypes[i]) {
         case INT:
-          rowSizeInBytes += 4;
-          break;
-        case LONG:
-          rowSizeInBytes += 8;
-          break;
         case FLOAT:
-          if (dataTableVersion >= DataTableFactory.VERSION_4) {
-            rowSizeInBytes += 4;
-          } else {
-            rowSizeInBytes += 8;
-          }
-          break;
-        case DOUBLE:
-          rowSizeInBytes += 8;
-          break;
         case STRING:
+          // For STRING, we store the dictionary id.
           rowSizeInBytes += 4;
           break;
-        // Object and array. (POSITION|LENGTH)
         default:
+          // This covers LONG, DOUBLE and variable length data types 
(POSITION|LENGTH).
           rowSizeInBytes += 8;
           break;
       }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
index d4e93e7476..40c438d388 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
@@ -18,64 +18,35 @@
  */
 package org.apache.pinot.core.common.datatable;
 
+import com.google.common.base.Preconditions;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.datatable.DataTableImplV2;
-import org.apache.pinot.common.datatable.DataTableImplV3;
 import org.apache.pinot.common.datatable.DataTableImplV4;
 import org.apache.pinot.common.utils.DataSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class DataTableBuilderFactory {
   private DataTableBuilderFactory() {
   }
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataTableBuilderFactory.class);
-
   public static final int DEFAULT_VERSION = DataTableFactory.VERSION_4;
 
-  private static int _version = DEFAULT_VERSION;
-
   public static int getDataTableVersion() {
-    return _version;
+    return DEFAULT_VERSION;
   }
 
   public static void setDataTableVersion(int version) {
-    LOGGER.info("Setting DataTable version to: {}", version);
-    if (version != DataTableFactory.VERSION_2 && version != 
DataTableFactory.VERSION_3
-        && version != DataTableFactory.VERSION_4) {
-      throw new IllegalArgumentException("Unsupported version: " + version);
-    }
-    _version = version;
+    Preconditions.checkArgument(version == DEFAULT_VERSION, "Unsupported 
version: " + version);
   }
 
   public static DataTableBuilder getDataTableBuilder(DataSchema dataSchema) {
-    switch (_version) {
-      case DataTableFactory.VERSION_2:
-      case DataTableFactory.VERSION_3:
-        return new DataTableBuilderV2V3(dataSchema, _version);
-      case DataTableFactory.VERSION_4:
-        return new DataTableBuilderV4(dataSchema);
-      default:
-        throw new IllegalStateException("Unsupported data table version: " + 
_version);
-    }
+    return new DataTableBuilderV4(dataSchema);
   }
 
   /**
    * Returns an empty data table without data.
    */
   public static DataTable getEmptyDataTable() {
-    switch (_version) {
-      case DataTableFactory.VERSION_2:
-        return new DataTableImplV2();
-      case DataTableFactory.VERSION_3:
-        return new DataTableImplV3();
-      case DataTableFactory.VERSION_4:
-        return new DataTableImplV4();
-      default:
-        throw new IllegalStateException("Unsupported data table version: " + 
_version);
-    }
+    return new DataTableImplV4();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
deleted file mode 100644
index 375b1b4ad1..0000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common.datatable;
-
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.datatable.DataTableImplV2;
-import org.apache.pinot.common.datatable.DataTableImplV3;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.spi.utils.ByteArray;
-import org.roaringbitmap.RoaringBitmap;
-
-
-/**
- * Kept for backward compatible. Things improved in the newer versions:
- * - Float size (should be 4 instead of 8)
- * - Store bytes as variable size data instead of String
- * - Use one dictionary for all columns (save space)
- * - Support setting nullRowIds
- */
-public class DataTableBuilderV2V3 extends BaseDataTableBuilder {
-  private final Map<String, Map<String, Integer>> _dictionaryMap = new 
HashMap<>();
-  private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new 
HashMap<>();
-
-  public DataTableBuilderV2V3(DataSchema dataSchema, int version) {
-    super(dataSchema, version);
-    Preconditions.checkArgument(version <= DataTableFactory.VERSION_3);
-  }
-
-  @Override
-  public void setColumn(int colId, String value) {
-    String columnName = _dataSchema.getColumnName(colId);
-    Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
-    if (dictionary == null) {
-      dictionary = new HashMap<>();
-      _dictionaryMap.put(columnName, dictionary);
-      _reverseDictionaryMap.put(columnName, new HashMap<>());
-    }
-
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    Integer dictId = dictionary.get(value);
-    if (dictId == null) {
-      dictId = dictionary.size();
-      dictionary.put(value, dictId);
-      _reverseDictionaryMap.get(columnName).put(dictId, value);
-    }
-    _currentRowDataByteBuffer.putInt(dictId);
-  }
-
-  @Override
-  public void setColumn(int colId, ByteArray value)
-      throws IOException {
-    setColumn(colId, value.toHexString());
-  }
-
-  @Override
-  public void setColumn(int colId, String[] values)
-      throws IOException {
-    _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-    
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
-    _currentRowDataByteBuffer.putInt(values.length);
-
-    String columnName = _dataSchema.getColumnName(colId);
-    Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
-    if (dictionary == null) {
-      dictionary = new HashMap<>();
-      _dictionaryMap.put(columnName, dictionary);
-      _reverseDictionaryMap.put(columnName, new HashMap<>());
-    }
-
-    for (String value : values) {
-      Integer dictId = dictionary.get(value);
-      if (dictId == null) {
-        dictId = dictionary.size();
-        dictionary.put(value, dictId);
-        _reverseDictionaryMap.get(columnName).put(dictId, value);
-      }
-      _variableSizeDataOutputStream.writeInt(dictId);
-    }
-  }
-
-  @Override
-  public void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
-      throws IOException {
-    throw new UnsupportedOperationException("Not supported before DataTable 
V4");
-  }
-
-  @Override
-  public DataTable build() {
-    byte[] fixedSizeDataBytes = 
_fixedSizeDataByteArrayOutputStream.toByteArray();
-    byte[] variableSizeDataBytes = 
_variableSizeDataByteArrayOutputStream.toByteArray();
-    if (_version == DataTableFactory.VERSION_2) {
-      return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap, 
fixedSizeDataBytes,
-          variableSizeDataBytes);
-    } else {
-      return new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap, 
fixedSizeDataBytes,
-          variableSizeDataBytes);
-    }
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index d4ce7857a5..87586fe154 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -18,16 +18,13 @@
  */
 package org.apache.pinot.core.query.request;
 
-import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -111,12 +108,7 @@ public class ServerQueryRequest {
   }
 
   private static QueryContext getQueryContext(PinotQuery pinotQuery) {
-    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(pinotQuery);
-    if (queryContext.isNullHandlingEnabled()) {
-      Preconditions.checkState(DataTableBuilderFactory.getDataTableVersion() 
>= DataTableFactory.VERSION_4,
-          "Null handling cannot be enabled for data table version smaller than 
4");
-    }
-    return queryContext;
+    return QueryContextConverterUtils.getQueryContext(pinotQuery);
   }
 
   public long getRequestId() {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 91f1df4fa7..a539f8d542 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -18,16 +18,9 @@
  */
 package org.apache.pinot.core.common.datatable;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -46,8 +39,6 @@ import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 
 /**
  * Unit test for {@link DataTable} serialization/de-serialization.
@@ -77,30 +68,6 @@ public class DataTableSerDeTest {
   private static final int[][] BOOLEAN_ARRAYS = new int[NUM_ROWS][];
   private static final long[][] TIMESTAMP_ARRAYS = new long[NUM_ROWS][];
   private static final String[][] STRING_ARRAYS = new String[NUM_ROWS][];
-  private static final Map<String, String> EXPECTED_METADATA =
-      ImmutableMap.<String, 
String>builder().put(MetadataKey.NUM_DOCS_SCANNED.getName(), 
String.valueOf(20L))
-          .put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), 
String.valueOf(5L))
-          .put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), 
String.valueOf(7L))
-          .put(MetadataKey.NUM_SEGMENTS_QUERIED.getName(), String.valueOf(6))
-          .put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), String.valueOf(6))
-          .put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), String.valueOf(1))
-          .put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), 
String.valueOf(1))
-          .put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
String.valueOf(100L))
-          .put(MetadataKey.TOTAL_DOCS.getName(), String.valueOf(200L))
-          .put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true")
-          .put(MetadataKey.TIME_USED_MS.getName(), 
String.valueOf(20000L)).put(MetadataKey.TRACE_INFO.getName(),
-              "StudentException: Error finding students\n"
-                  + "        at 
StudentManager.findStudents(StudentManager.java:13)\n"
-                  + "        at StudentProgram.main(StudentProgram.java:9)\n"
-                  + "Caused by: DAOException: Error querying students from 
database\n"
-                  + "        at StudentDAO.list(StudentDAO.java:11)\n"
-                  + "        at 
StudentManager.findStudents(StudentManager.java:11)\n" + "        ... 1 more\n"
-                  + "Caused by: java.sql.SQLException: Syntax Error\n"
-                  + "        at 
DatabaseUtils.executeQuery(DatabaseUtils.java:5)\n"
-                  + "        at StudentDAO.list(StudentDAO.java:8)\n" + "      
  ... 2 more")
-          .put(MetadataKey.REQUEST_ID.getName(), String.valueOf(90181881818L))
-          .put(MetadataKey.NUM_RESIZES.getName(), String.valueOf(900L))
-          .put(MetadataKey.RESIZE_TIME_MS.getName(), 
String.valueOf(1919199L)).build();
 
   @Test(dataProvider = "versionProvider")
   public void testException(int dataTableVersion)
@@ -119,7 +86,6 @@ public class DataTableSerDeTest {
 
     String actual = 
newDataTable.getExceptions().get(QueryException.QUERY_EXECUTION_ERROR.getErrorCode());
     Assert.assertEquals(actual, expected);
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @Test(dataProvider = "versionProvider")
@@ -146,15 +112,13 @@ public class DataTableSerDeTest {
           new DataSchema(new String[]{"BYTES"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BYTES}),
           numRows, new Object[]{emptyBytes});
 
-      testEmptyValues(
-          new DataSchema(new String[]{"BOOL_ARR"},
-              new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}),
-          numRows, new Object[]{new int[]{}});
+      testEmptyValues(new DataSchema(new String[]{"BOOL_ARR"},
+              new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}), numRows,
+          new Object[]{new int[]{}});
 
-      testEmptyValues(
-          new DataSchema(new String[]{"BOOL_ARR"},
-              new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}),
-          numRows, new Object[]{new int[]{0}});
+      testEmptyValues(new DataSchema(new String[]{"BOOL_ARR"},
+              new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}), numRows,
+          new Object[]{new int[]{0}});
 
       testEmptyValues(
           new DataSchema(new String[]{"INT_ARR"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT_ARRAY}),
@@ -186,7 +150,6 @@ public class DataTableSerDeTest {
               new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE_ARRAY}), numRows,
           new Object[]{new double[]{0}});
     }
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   private void testEmptyValues(DataSchema dataSchema, int numRows, Object[] 
emptyValues)
@@ -278,7 +241,6 @@ public class DataTableSerDeTest {
     Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
     Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
     verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @Test(dataProvider = "versionProvider")
@@ -301,7 +263,6 @@ public class DataTableSerDeTest {
         verifyDataIsSame(newDataTable, columnDataType, 1, numRows);
       }
     }
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @Test(dataProvider = "versionProvider")
@@ -338,346 +299,6 @@ public class DataTableSerDeTest {
     
Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName()));
     Assert.assertTrue(
         
Integer.parseInt(newDataTable.getMetadata().get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()))
 > 0);
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
-  }
-
-  @Test
-  public void testV3V4Compatibility()
-      throws IOException {
-    DataSchema.ColumnDataType[] columnDataTypes = 
DataSchema.ColumnDataType.values();
-    int numColumns = columnDataTypes.length;
-    String[] columnNames = new String[numColumns];
-    for (int i = 0; i < numColumns; i++) {
-      columnNames[i] = columnDataTypes[i].name();
-    }
-
-    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-
-    // TODO: verify data table compatibility across multi-stage and normal 
query engine.
-    // TODO: see https://github.com/apache/pinot/pull/8874/files#r894806085
-
-    // Verify V4 broker can deserialize data table (has data, but has no 
metadata) send by V3 server
-    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
-    DataTableBuilder dataTableBuilderV3WithDataOnly = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, 
columnDataTypes, numColumns);
-
-    DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create 
a V3 data table
-    DataTable newDataTable =
-        DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker 
deserialize data table bytes as V3
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata().size(), 0);
-
-    // Verify V4 broker can deserialize data table (has data and metadata) 
send by V3 server
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // 
Broker deserialize data table bytes as V3
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V4 broker can deserialize data table (only has metadata) send by 
V3 server
-    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a 
V3 data table
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // 
Broker deserialize data table bytes as V3
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V4 broker can deserialize (has data, but has no metadata) send 
by V4 server(with ThreadCpuTimeMeasurement
-    // disabled)
-    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
-    DataTableBuilder dataTableBuilderV4WithDataOnly = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, 
columnDataTypes, numColumns);
-    DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create 
a V4 data table
-    // Deserialize data table bytes as V4
-    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata().size(), 0);
-
-    // Verify V4 broker can deserialize data table (has data and metadata) 
send by V4 server(with
-    // ThreadCpuTimeMeasurement disabled)
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    // Deserialize data table bytes as V4
-    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // 
Broker deserialize data table bytes as V4
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V4 broker can deserialize data table (only has metadata) send by 
V4 server(with
-    // ThreadCpuTimeMeasurement disabled)
-    DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a 
V4 data table
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // 
Broker deserialize data table bytes as V4
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V4 broker can deserialize (has data, but has no metadata) send 
by V4 server(with ThreadCpuTimeMeasurement
-    // enabled)
-    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
-    dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data 
table
-    // Deserialize data table bytes as V4
-    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata().size(), 1);
-    
Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
-
-    // Verify V4 broker can deserialize data table (has data and metadata) 
send by V4 server(with
-    // ThreadCpuTimeMeasurement enabled)
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    // Deserialize data table bytes as V4
-    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // 
Broker deserialize data table bytes as V4
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
-      Assert.assertEquals(newDataTable.getMetadata().size(), 
EXPECTED_METADATA.keySet().size() + 1);
-      
newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
-    }
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V4 broker can deserialize data table (only has metadata) send by 
V4 server(with
-    // ThreadCpuTimeMeasurement enabled)
-    dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a 
V4 data table
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // 
Broker deserialize data table bytes as V4
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
-    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
-      Assert.assertEquals(newDataTable.getMetadata().size(), 
EXPECTED_METADATA.keySet().size() + 1);
-      
newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
-    }
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-  }
-
-  @Test
-  public void testV2V3Compatibility()
-      throws IOException {
-    DataSchema.ColumnDataType[] columnDataTypes = 
DataSchema.ColumnDataType.values();
-    int numColumns = columnDataTypes.length;
-    String[] columnNames = new String[numColumns];
-    for (int i = 0; i < numColumns; i++) {
-      columnNames[i] = columnDataTypes[i].name();
-    }
-
-    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-
-    // Verify V3 broker can deserialize data table (has data, but has no 
metadata) send by V2 server
-    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_2);
-    DataTableBuilder dataTableBuilderV2WithDataOnly = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly, 
columnDataTypes, numColumns);
-
-    DataTable dataTableV2 = dataTableBuilderV2WithDataOnly.build(); // create 
a V2 data table
-    DataTable newDataTable =
-        DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker 
deserialize data table bytes as V2
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata().size(), 0);
-
-    // Verify V3 broker can deserialize data table (has data and metadata) 
send by V2 server
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); // 
Broker deserialize data table bytes as V2
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V3 broker can deserialize data table (only has metadata) send by 
V2 server
-    DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    dataTableV2 = dataTableBuilderV2WithMetadataDataOnly.build(); // create a 
V2 data table
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); // 
Broker deserialize data table bytes as V2
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V3 broker can deserialize (has data, but has no metadata) send 
by V3 server(with ThreadCpuTimeMeasurement
-    // disabled)
-    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
-    DataTableBuilder dataTableBuilderV3WithDataOnly = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, 
columnDataTypes, numColumns);
-    DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create 
a V3 data table
-    // Deserialize data table bytes as V3
-    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata().size(), 0);
-
-    // Verify V3 broker can deserialize data table (has data and metadata) 
send by V3 server(with
-    // ThreadCpuTimeMeasurement disabled)
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    // Deserialize data table bytes as V3
-    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // 
Broker deserialize data table bytes as V3
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V3 broker can deserialize data table (only has metadata) send by 
V3 server(with
-    // ThreadCpuTimeMeasurement disabled)
-    DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a 
V3 data table
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // 
Broker deserialize data table bytes as V3
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V3 broker can deserialize (has data, but has no metadata) send 
by V3 server(with ThreadCpuTimeMeasurement
-    // enabled)
-    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
-    dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data 
table
-    // Deserialize data table bytes as V3
-    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    Assert.assertEquals(newDataTable.getMetadata().size(), 1);
-    
Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
-
-    // Verify V3 broker can deserialize data table (has data and metadata) 
send by V3 server(with
-    // ThreadCpuTimeMeasurement enabled)
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    // Deserialize data table bytes as V3
-    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // 
Broker deserialize data table bytes as V3
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, 
ERROR_MESSAGE);
-    verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
-      Assert.assertEquals(newDataTable.getMetadata().size(), 
EXPECTED_METADATA.keySet().size() + 1);
-      
newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
-    }
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-
-    // Verify V3 broker can deserialize data table (only has metadata) send by 
V3 server(with
-    // ThreadCpuTimeMeasurement enabled)
-    dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a 
V3 data table
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-    newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // 
Broker deserialize data table bytes as V3
-    Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, 
ERROR_MESSAGE);
-    Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
-    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
-      Assert.assertEquals(newDataTable.getMetadata().size(), 
EXPECTED_METADATA.keySet().size() + 1);
-      
newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
-    }
-    Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
-  }
-
-  @Test
-  public void testDataTableVer3MetadataBytesLayout()
-      throws IOException {
-    DataSchema.ColumnDataType[] columnDataTypes = 
DataSchema.ColumnDataType.values();
-    int numColumns = columnDataTypes.length;
-    String[] columnNames = new String[numColumns];
-    for (int i = 0; i < numColumns; i++) {
-      columnNames[i] = columnDataTypes[i].name();
-    }
-
-    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
-    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
-    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
-    fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
-
-    DataTable dataTable = dataTableBuilder.build();
-
-    for (String key : EXPECTED_METADATA.keySet()) {
-      dataTable.getMetadata().put(key, EXPECTED_METADATA.get(key));
-    }
-
-    ByteBuffer byteBuffer = ByteBuffer.wrap(dataTable.toBytes());
-    int version = byteBuffer.getInt();
-    Assert.assertEquals(version, DataTableFactory.VERSION_3);
-    byteBuffer.getInt(); // numOfRows
-    byteBuffer.getInt(); // numOfColumns
-    byteBuffer.getInt(); // exceptionsStart
-    byteBuffer.getInt(); // exceptionsLength
-    byteBuffer.getInt(); // dictionaryMapStart
-    byteBuffer.getInt(); // dictionaryMapLength
-    byteBuffer.getInt(); // dataSchemaStart
-    byteBuffer.getInt(); // dataSchemaLength
-    byteBuffer.getInt(); // fixedSizeDataStart
-    byteBuffer.getInt(); // fixedSizeDataLength
-    int variableSizeDataStart = byteBuffer.getInt();
-    int variableSizeDataLength = byteBuffer.getInt();
-
-    int metadataStart = variableSizeDataStart + variableSizeDataLength;
-    byteBuffer.position(metadataStart);
-    int metadataLength = byteBuffer.getInt();
-    byte[] metadataBytes = new byte[metadataLength];
-    byteBuffer.get(metadataBytes);
-
-    try (ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(metadataBytes);
-        DataInputStream dataInputStream = new 
DataInputStream(byteArrayInputStream)) {
-      int numEntries = dataInputStream.readInt();
-      // DataTable V3 and V4 serialization logic will add an extra 
RESPONSE_SER_CPU_TIME_NS KV pair into metadata
-      Assert.assertEquals(numEntries, EXPECTED_METADATA.size());
-      for (int i = 0; i < numEntries; i++) {
-        int keyOrdinal = dataInputStream.readInt();
-        DataTable.MetadataKey key = MetadataKey.getById(keyOrdinal);
-        Assert.assertNotEquals(key, null);
-        if (key.getValueType() == DataTable.MetadataValueType.INT) {
-          byte[] actualBytes = new byte[Integer.BYTES];
-          dataInputStream.read(actualBytes);
-          Assert.assertEquals(actualBytes, 
Ints.toByteArray(Integer.parseInt(EXPECTED_METADATA.get(key.getName()))));
-        } else if (key.getValueType() == DataTable.MetadataValueType.LONG) {
-          byte[] actualBytes = new byte[Long.BYTES];
-          dataInputStream.read(actualBytes);
-          // Ignore the 
THREAD_CPU_TIME_NS/SYSTEM_ACTIVITIES_CPU_TIME_NS/RESPONSE_SER_CPU_TIME_NS key 
since their value
-          // are evaluated during query execution.
-          if (key != MetadataKey.THREAD_CPU_TIME_NS && key != 
MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS
-              && key != MetadataKey.RESPONSE_SER_CPU_TIME_NS) {
-            Assert.assertEquals(actualBytes, 
Longs.toByteArray(Long.parseLong(EXPECTED_METADATA.get(key.getName()))));
-          }
-        } else {
-          int valueLength = dataInputStream.readInt();
-          byte[] actualBytes = new byte[valueLength];
-          dataInputStream.read(actualBytes);
-          Assert.assertEquals(actualBytes, 
EXPECTED_METADATA.get(key.getName()).getBytes(UTF_8));
-        }
-      }
-    }
   }
 
   private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder,
@@ -689,18 +310,15 @@ public class DataTableSerDeTest {
   private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder,
       DataSchema.ColumnDataType[] columnDataTypes, int numColumns, int numRows)
       throws IOException {
-    RoaringBitmap[] nullBitmaps = null;
-    if (DataTableBuilderFactory.getDataTableVersion() >= 
DataTableFactory.VERSION_4) {
-      nullBitmaps = new RoaringBitmap[numColumns];
-      for (int colId = 0; colId < numColumns; colId++) {
-        nullBitmaps[colId] = new RoaringBitmap();
-      }
+    RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+    for (int colId = 0; colId < numColumns; colId++) {
+      nullBitmaps[colId] = new RoaringBitmap();
     }
     for (int rowId = 0; rowId < numRows; rowId++) {
       dataTableBuilder.startRow();
       for (int colId = 0; colId < numColumns; colId++) {
         // Note: isNull is handled for SV columns only for now.
-        boolean isNull = nullBitmaps != null && RANDOM.nextFloat() < 0.1;
+        boolean isNull = RANDOM.nextFloat() < 0.1;
         if (isNull) {
           nullBitmaps[colId].add(rowId);
         }
@@ -931,8 +549,7 @@ public class DataTableSerDeTest {
   @DataProvider(name = "versionProvider")
   public Object[][] provideVersion() {
     return new Object[][]{
-        new Object[]{DataTableFactory.VERSION_4},
-        new Object[]{DataTableFactory.VERSION_3},
+        new Object[]{DataTableFactory.VERSION_4}
     };
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
index a2a720edfc..234ccfc0bf 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java
@@ -30,7 +30,6 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -658,7 +657,6 @@ public class AllNullQueriesTest extends BaseQueriesTest {
 
     query.verify(columnDataType, brokerResponse);
 
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
     _indexSegment.destroy();
     FileUtils.deleteDirectory(indexDir);
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
index 3d50c89484..572a6d5c0e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java
@@ -32,7 +32,6 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -428,7 +427,6 @@ public class BigDecimalQueriesTest extends BaseQueriesTest {
         i++;
       }
     }
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @AfterClass
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
index 481634f116..bf9fe6e98f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java
@@ -31,7 +31,6 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -447,7 +446,6 @@ public class BooleanNullEnabledQueriesTest extends 
BaseQueriesTest {
       assertEquals(thirdRow[0], (long) _nullValuesCount * 4);
       assertNull(thirdRow[1]);
     }
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @AfterClass
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
index 41a664c5f1..bba3aa4b53 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java
@@ -32,7 +32,6 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -733,7 +732,6 @@ public class NullEnabledQueriesTest extends BaseQueriesTest 
{
         assertNull(rows.get(rows.size() - 1)[0]);
       }
     }
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @AfterClass
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index 7a838052f9..cdf0af03bd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -206,7 +205,6 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet {
     pinotQuery = "SELECT COUNT(1) FROM " + getTableName() + " 
option(enableNullHandling=true)";
     h2Query = "SELECT COUNT(1) FROM " + getTableName();
     testQuery(pinotQuery, h2Query);
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
   }
 
   @Test(dataProvider = "nullLiteralQueries")
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 6fb23d96f6..48c703b653 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -50,7 +50,6 @@ import org.apache.hc.core5.http.message.BasicNameValuePair;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.client.PinotConnection;
 import org.apache.pinot.client.PinotDriver;
-import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -59,7 +58,6 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.http.HttpClient;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
@@ -190,7 +188,6 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   @BeforeClass
   public void setUp()
       throws Exception {
-    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
 
     // Start the Pinot cluster
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index 67e22a66d0..65f6906b68 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -143,7 +142,6 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
 
   @AfterClass
   public void tearDown() {
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
     for (QueryServerEnclosure server : _servers.values()) {
       server.shutDown();
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 13e35af7ec..9ffce6810d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -41,7 +41,6 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -249,7 +248,6 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
   public void tearDown() {
     // Restore the original default timezone
     TimeZone.setDefault(_currentSystemTimeZone);
-    
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
     for (QueryServerEnclosure server : _servers.values()) {
       server.shutDown();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to