This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new dacc6d0690 Cleanup old DataTable v2 and v3 (#13590) dacc6d0690 is described below commit dacc6d06907c44e83721454f1090e5f00c824f15 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Jul 11 23:07:49 2024 -0700 Cleanup old DataTable v2 and v3 (#13590) --- .../pinot/common/datablock/BaseDataBlock.java | 4 +- .../pinot/common/datatable/DataTableFactory.java | 15 +- .../pinot/common/datatable/DataTableImplV2.java | 272 -------------- .../pinot/common/datatable/DataTableImplV3.java | 403 -------------------- .../pinot/common/datatable/DataTableImplV4.java | 39 +- .../pinot/common/datatable/DataTableUtils.java | 18 +- .../common/datatable/DataTableBuilderFactory.java | 39 +- .../common/datatable/DataTableBuilderV2V3.java | 121 ------ .../core/query/request/ServerQueryRequest.java | 10 +- .../core/common/datatable/DataTableSerDeTest.java | 405 +-------------------- .../apache/pinot/queries/AllNullQueriesTest.java | 2 - .../pinot/queries/BigDecimalQueriesTest.java | 2 - .../queries/BooleanNullEnabledQueriesTest.java | 2 - .../pinot/queries/NullEnabledQueriesTest.java | 2 - .../tests/NullHandlingIntegrationTest.java | 2 - .../tests/OfflineClusterIntegrationTest.java | 3 - .../query/runtime/queries/QueryRunnerTest.java | 2 - .../runtime/queries/ResourceBasedQueriesTest.java | 2 - 18 files changed, 59 insertions(+), 1284 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java index 333ac673b6..9b643b088d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java @@ -28,7 +28,7 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.apache.pinot.common.CustomObject; -import org.apache.pinot.common.datatable.DataTableImplV3; +import org.apache.pinot.common.datatable.DataTableImplV4; import org.apache.pinot.common.datatable.DataTableUtils; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; @@ -43,7 +43,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; /** - * Base data block mostly replicating implementation of {@link DataTableImplV3}. + * Base data block mostly replicating implementation of {@link DataTableImplV4}. * * +-----------------------------------------------+ * | 13 integers of header: | diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java index 55374d7cca..ec7762c605 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.datatable; +import com.google.common.base.Preconditions; import java.io.IOException; import java.nio.ByteBuffer; @@ -26,23 +27,13 @@ public class DataTableFactory { private DataTableFactory() { } - public static final int VERSION_2 = 2; - public static final int VERSION_3 = 3; public static final int VERSION_4 = 4; public static DataTable getDataTable(ByteBuffer byteBuffer) throws IOException { int version = byteBuffer.getInt(); - switch (version) { - case VERSION_2: - return new DataTableImplV2(byteBuffer); - case VERSION_3: - return new DataTableImplV3(byteBuffer); - case VERSION_4: - return new DataTableImplV4(byteBuffer); - default: - throw new IllegalStateException("Unsupported data table version: " + version); - } + Preconditions.checkState(version == VERSION_4, "Unsupported data table version: %s", version); + return new DataTableImplV4(byteBuffer); } public static DataTable getDataTable(byte[] bytes) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java deleted file mode 100644 index 261e1edc46..0000000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV2.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.common.datatable; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import org.apache.pinot.common.response.ProcessingException; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.HashUtil; - -import static java.nio.charset.StandardCharsets.UTF_8; - - -public class DataTableImplV2 extends BaseDataTable { - // VERSION - // NUM_ROWS - // NUM_COLUMNS - // DICTIONARY_MAP (START|SIZE) - // METADATA (START|SIZE) - // DATA_SCHEMA (START|SIZE) - // FIXED_SIZE_DATA (START|SIZE) - // VARIABLE_SIZE_DATA (START|SIZE) - private static final int HEADER_SIZE = Integer.BYTES * 13; - - /** - * Construct data table with results. (Server side) - */ - public DataTableImplV2(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap, - byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) { - super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes); - } - - /** - * Construct empty data table. (Server side) - */ - public DataTableImplV2() { - } - - /** - * Construct data table from byte array. (broker side) - */ - public DataTableImplV2(ByteBuffer byteBuffer) - throws IOException { - // Read header. - _numRows = byteBuffer.getInt(); - _numColumns = byteBuffer.getInt(); - int dictionaryMapStart = byteBuffer.getInt(); - int dictionaryMapLength = byteBuffer.getInt(); - int metadataStart = byteBuffer.getInt(); - int metadataLength = byteBuffer.getInt(); - int dataSchemaStart = byteBuffer.getInt(); - int dataSchemaLength = byteBuffer.getInt(); - int fixedSizeDataStart = byteBuffer.getInt(); - int fixedSizeDataLength = byteBuffer.getInt(); - int variableSizeDataStart = byteBuffer.getInt(); - int variableSizeDataLength = byteBuffer.getInt(); - - // Read dictionary. - if (dictionaryMapLength != 0) { - byteBuffer.position(dictionaryMapStart); - _dictionaryMap = deserializeDictionaryMap(byteBuffer); - } else { - _dictionaryMap = null; - } - - // Read metadata. - byteBuffer.position(metadataStart); - _metadata = deserializeMetadata(byteBuffer); - - // Read data schema. - if (dataSchemaLength != 0) { - byteBuffer.position(dataSchemaStart); - _dataSchema = DataSchema.fromBytes(byteBuffer); - _columnOffsets = new int[_dataSchema.size()]; - _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets, getVersion()); - } else { - _dataSchema = null; - _columnOffsets = null; - _rowSizeInBytes = 0; - } - - // Read fixed size data. - if (fixedSizeDataLength != 0) { - _fixedSizeDataBytes = new byte[fixedSizeDataLength]; - byteBuffer.position(fixedSizeDataStart); - byteBuffer.get(_fixedSizeDataBytes); - _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes); - } else { - _fixedSizeDataBytes = null; - _fixedSizeData = null; - } - - // Read variable size data. - if (variableSizeDataLength != 0) { - _variableSizeDataBytes = new byte[variableSizeDataLength]; - byteBuffer.position(variableSizeDataStart); - byteBuffer.get(_variableSizeDataBytes); - _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes); - } else { - _variableSizeDataBytes = null; - _variableSizeData = null; - } - } - - @Override - public int getVersion() { - return DataTableFactory.VERSION_2; - } - - private Map<String, String> deserializeMetadata(ByteBuffer buffer) - throws IOException { - int numEntries = buffer.getInt(); - Map<String, String> metadata = new HashMap<>(HashUtil.getHashMapCapacity(numEntries)); - - for (int i = 0; i < numEntries; i++) { - String key = DataTableUtils.decodeString(buffer); - String value = DataTableUtils.decodeString(buffer); - metadata.put(key, value); - } - - return metadata; - } - - @Override - public void addException(ProcessingException processingException) { - _metadata.put(EXCEPTION_METADATA_KEY + processingException.getErrorCode(), processingException.getMessage()); - } - - @Override - public void addException(int errCode, String errMsg) { - _metadata.put(EXCEPTION_METADATA_KEY + errCode, errMsg); - } - - // getExceptions return a map of errorCode->errMessage of the datatable. - @Override - public Map<Integer, String> getExceptions() { - Map<Integer, String> exceptions = new HashMap<>(); - for (String key : _metadata.keySet()) { - if (key.startsWith(EXCEPTION_METADATA_KEY)) { - // In V2, all exceptions are added into metadata, using "Exception"+errCode as key, - // Integer.parseInt(key.substring(9)) can extract the error code from the key. - exceptions.put(Integer.parseInt(key.substring(9)), _metadata.get(key)); - } - } - return exceptions; - } - - @Override - public byte[] toBytes() - throws IOException { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - dataOutputStream.writeInt(DataTableFactory.VERSION_2); - dataOutputStream.writeInt(_numRows); - dataOutputStream.writeInt(_numColumns); - int dataOffset = HEADER_SIZE; - - // Write dictionary. - dataOutputStream.writeInt(dataOffset); - byte[] dictionaryMapBytes = null; - if (_dictionaryMap != null) { - dictionaryMapBytes = serializeDictionaryMap(); - dataOutputStream.writeInt(dictionaryMapBytes.length); - dataOffset += dictionaryMapBytes.length; - } else { - dataOutputStream.writeInt(0); - } - - // Write metadata. - dataOutputStream.writeInt(dataOffset); - byte[] metadataBytes = serializeMetadata(); - dataOutputStream.writeInt(metadataBytes.length); - dataOffset += metadataBytes.length; - - // Write data schema. - dataOutputStream.writeInt(dataOffset); - byte[] dataSchemaBytes = null; - if (_dataSchema != null) { - dataSchemaBytes = _dataSchema.toBytes(); - dataOutputStream.writeInt(dataSchemaBytes.length); - dataOffset += dataSchemaBytes.length; - } else { - dataOutputStream.writeInt(0); - } - - // Write fixed size data. - dataOutputStream.writeInt(dataOffset); - if (_fixedSizeDataBytes != null) { - dataOutputStream.writeInt(_fixedSizeDataBytes.length); - dataOffset += _fixedSizeDataBytes.length; - } else { - dataOutputStream.writeInt(0); - } - - // Write variable size data. - dataOutputStream.writeInt(dataOffset); - if (_variableSizeDataBytes != null) { - dataOutputStream.writeInt(_variableSizeDataBytes.length); - } else { - dataOutputStream.writeInt(0); - } - - // Write actual data. - if (dictionaryMapBytes != null) { - dataOutputStream.write(dictionaryMapBytes); - } - dataOutputStream.write(metadataBytes); - if (dataSchemaBytes != null) { - dataOutputStream.write(dataSchemaBytes); - } - if (_fixedSizeDataBytes != null) { - dataOutputStream.write(_fixedSizeDataBytes); - } - if (_variableSizeDataBytes != null) { - dataOutputStream.write(_variableSizeDataBytes); - } - - return byteArrayOutputStream.toByteArray(); - } - - @Override - public DataTableImplV2 toMetadataOnlyDataTable() { - DataTableImplV2 metadataOnlyDataTable = new DataTableImplV2(); - metadataOnlyDataTable._metadata.putAll(_metadata); - return metadataOnlyDataTable; - } - - @Override - public DataTableImplV2 toDataOnlyDataTable() { - return new DataTableImplV2(_numRows, _dataSchema, _dictionaryMap, _fixedSizeDataBytes, _variableSizeDataBytes); - } - - private byte[] serializeMetadata() - throws IOException { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - - dataOutputStream.writeInt(_metadata.size()); - for (Entry<String, String> entry : _metadata.entrySet()) { - byte[] keyBytes = entry.getKey().getBytes(UTF_8); - dataOutputStream.writeInt(keyBytes.length); - dataOutputStream.write(keyBytes); - - byte[] valueBytes = entry.getValue().getBytes(UTF_8); - dataOutputStream.writeInt(valueBytes.length); - dataOutputStream.write(valueBytes); - } - - return byteArrayOutputStream.toByteArray(); - } -} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java deleted file mode 100644 index af4b7eff46..0000000000 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java +++ /dev/null @@ -1,403 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pinot.common.datatable; - -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import org.apache.pinot.common.response.ProcessingException; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.HashUtil; -import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; - -import static java.nio.charset.StandardCharsets.UTF_8; - - -/** - * Datatable V3 implementation. - * The layout of serialized V3 datatable looks like: - * +-----------------------------------------------+ - * | 13 integers of header: | - * | VERSION | - * | NUM_ROWS | - * | NUM_COLUMNS | - * | EXCEPTIONS SECTION START OFFSET | - * | EXCEPTIONS SECTION LENGTH | - * | DICTIONARY_MAP SECTION START OFFSET | - * | DICTIONARY_MAP SECTION LENGTH | - * | DATA_SCHEMA SECTION START OFFSET | - * | DATA_SCHEMA SECTION LENGTH | - * | FIXED_SIZE_DATA SECTION START OFFSET | - * | FIXED_SIZE_DATA SECTION LENGTH | - * | VARIABLE_SIZE_DATA SECTION START OFFSET | - * | VARIABLE_SIZE_DATA SECTION LENGTH | - * +-----------------------------------------------+ - * | EXCEPTIONS SECTION | - * +-----------------------------------------------+ - * | DICTIONARY_MAP SECTION | - * +-----------------------------------------------+ - * | DATA_SCHEMA SECTION | - * +-----------------------------------------------+ - * | FIXED_SIZE_DATA SECTION | - * +-----------------------------------------------+ - * | VARIABLE_SIZE_DATA SECTION | - * +-----------------------------------------------+ - * | METADATA LENGTH | - * | METADATA SECTION | - * +-----------------------------------------------+ - */ -public class DataTableImplV3 extends BaseDataTable { - private static final int HEADER_SIZE = Integer.BYTES * 13; - // _errCodeToExceptionMap stores exceptions as a map of errorCode->errorMessage - private final Map<Integer, String> _errCodeToExceptionMap; - - /** - * Construct data table with results. (Server side) - */ - public DataTableImplV3(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap, - byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) { - super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes); - _errCodeToExceptionMap = new HashMap<>(); - } - - /** - * Construct empty data table. (Server side) - */ - public DataTableImplV3() { - _errCodeToExceptionMap = new HashMap<>(); - } - - /** - * Construct data table from byte array. (broker side) - */ - public DataTableImplV3(ByteBuffer byteBuffer) - throws IOException { - // Read header. - _numRows = byteBuffer.getInt(); - _numColumns = byteBuffer.getInt(); - int exceptionsStart = byteBuffer.getInt(); - int exceptionsLength = byteBuffer.getInt(); - int dictionaryMapStart = byteBuffer.getInt(); - int dictionaryMapLength = byteBuffer.getInt(); - int dataSchemaStart = byteBuffer.getInt(); - int dataSchemaLength = byteBuffer.getInt(); - int fixedSizeDataStart = byteBuffer.getInt(); - int fixedSizeDataLength = byteBuffer.getInt(); - int variableSizeDataStart = byteBuffer.getInt(); - int variableSizeDataLength = byteBuffer.getInt(); - - // Read exceptions. - if (exceptionsLength != 0) { - byteBuffer.position(exceptionsStart); - _errCodeToExceptionMap = deserializeExceptions(byteBuffer); - } else { - _errCodeToExceptionMap = new HashMap<>(); - } - - // Read dictionary. - if (dictionaryMapLength != 0) { - byteBuffer.position(dictionaryMapStart); - _dictionaryMap = deserializeDictionaryMap(byteBuffer); - } else { - _dictionaryMap = null; - } - - // Read data schema. - if (dataSchemaLength != 0) { - byteBuffer.position(dataSchemaStart); - _dataSchema = DataSchema.fromBytes(byteBuffer); - _columnOffsets = new int[_dataSchema.size()]; - _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets, getVersion()); - } else { - _dataSchema = null; - _columnOffsets = null; - _rowSizeInBytes = 0; - } - - // Read fixed size data. - if (fixedSizeDataLength != 0) { - _fixedSizeDataBytes = new byte[fixedSizeDataLength]; - byteBuffer.position(fixedSizeDataStart); - byteBuffer.get(_fixedSizeDataBytes); - _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes); - } else { - _fixedSizeDataBytes = null; - _fixedSizeData = null; - } - - // Read variable size data. - _variableSizeDataBytes = new byte[variableSizeDataLength]; - if (variableSizeDataLength != 0) { - byteBuffer.position(variableSizeDataStart); - byteBuffer.get(_variableSizeDataBytes); - } - _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes); - - // Read metadata. - int metadataLength = byteBuffer.getInt(); - if (metadataLength != 0) { - _metadata = deserializeMetadata(byteBuffer); - } - } - - @Override - public int getVersion() { - return DataTableFactory.VERSION_3; - } - - @Override - public void addException(ProcessingException processingException) { - _errCodeToExceptionMap.put(processingException.getErrorCode(), processingException.getMessage()); - } - - @Override - public void addException(int errCode, String errMsg) { - _errCodeToExceptionMap.put(errCode, errMsg); - } - - @Override - public Map<Integer, String> getExceptions() { - return _errCodeToExceptionMap; - } - - @Override - public byte[] toBytes() - throws IOException { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); - - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - writeLeadingSections(dataOutputStream); - - // Add table serialization time metadata if thread timer is enabled. - if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) { - long responseSerializationCpuTimeNs = threadResourceUsageProvider.getThreadTimeNs(); - getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs)); - } - - // Write metadata: length followed by actual metadata bytes. - // NOTE: We ignore metadata serialization time in "responseSerializationCpuTimeNs" as it's negligible while - // considering it will bring a lot code complexity. - byte[] metadataBytes = serializeMetadata(); - dataOutputStream.writeInt(metadataBytes.length); - dataOutputStream.write(metadataBytes); - - return byteArrayOutputStream.toByteArray(); - } - - @Override - public DataTableImplV3 toMetadataOnlyDataTable() { - DataTableImplV3 metadataOnlyDataTable = new DataTableImplV3(); - metadataOnlyDataTable._metadata.putAll(_metadata); - metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap); - return metadataOnlyDataTable; - } - - @Override - public DataTableImplV3 toDataOnlyDataTable() { - return new DataTableImplV3(_numRows, _dataSchema, _dictionaryMap, _fixedSizeDataBytes, _variableSizeDataBytes); - } - - private void writeLeadingSections(DataOutputStream dataOutputStream) - throws IOException { - dataOutputStream.writeInt(DataTableFactory.VERSION_3); - dataOutputStream.writeInt(_numRows); - dataOutputStream.writeInt(_numColumns); - int dataOffset = HEADER_SIZE; - - // Write exceptions section offset(START|SIZE). - dataOutputStream.writeInt(dataOffset); - byte[] exceptionsBytes; - exceptionsBytes = serializeExceptions(); - dataOutputStream.writeInt(exceptionsBytes.length); - dataOffset += exceptionsBytes.length; - - // Write dictionary map section offset(START|SIZE). - dataOutputStream.writeInt(dataOffset); - byte[] dictionaryMapBytes = null; - if (_dictionaryMap != null) { - dictionaryMapBytes = serializeDictionaryMap(); - dataOutputStream.writeInt(dictionaryMapBytes.length); - dataOffset += dictionaryMapBytes.length; - } else { - dataOutputStream.writeInt(0); - } - - // Write data schema section offset(START|SIZE). - dataOutputStream.writeInt(dataOffset); - byte[] dataSchemaBytes = null; - if (_dataSchema != null) { - dataSchemaBytes = _dataSchema.toBytes(); - dataOutputStream.writeInt(dataSchemaBytes.length); - dataOffset += dataSchemaBytes.length; - } else { - dataOutputStream.writeInt(0); - } - - // Write fixed size data section offset(START|SIZE). - dataOutputStream.writeInt(dataOffset); - if (_fixedSizeDataBytes != null) { - dataOutputStream.writeInt(_fixedSizeDataBytes.length); - dataOffset += _fixedSizeDataBytes.length; - } else { - dataOutputStream.writeInt(0); - } - - // Write variable size data section offset(START|SIZE). - dataOutputStream.writeInt(dataOffset); - if (_variableSizeDataBytes != null) { - dataOutputStream.writeInt(_variableSizeDataBytes.length); - } else { - dataOutputStream.writeInt(0); - } - - // Write actual data. - // Write exceptions bytes. - dataOutputStream.write(exceptionsBytes); - // Write dictionary map bytes. - if (dictionaryMapBytes != null) { - dataOutputStream.write(dictionaryMapBytes); - } - // Write data schema bytes. - if (dataSchemaBytes != null) { - dataOutputStream.write(dataSchemaBytes); - } - // Write fixed size data bytes. - if (_fixedSizeDataBytes != null) { - dataOutputStream.write(_fixedSizeDataBytes); - } - // Write variable size data bytes. - if (_variableSizeDataBytes != null) { - dataOutputStream.write(_variableSizeDataBytes); - } - } - - /** - * Serialize metadata section to bytes. - * Format of the bytes looks like: - * [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3] - * For each KV pair: - * - if the value type is String, encode it as: [enumKeyOrdinal, valueLength, Utf8EncodedValue]. - * - if the value type is int, encode it as: [enumKeyOrdinal, bigEndianRepresentationOfIntValue] - * - if the value type is long, encode it as: [enumKeyOrdinal, bigEndianRepresentationOfLongValue] - * - * Unlike V2, where numeric metadata values (int and long) in V3 are encoded in UTF-8 in the wire format, - * in V3 big endian representation is used. - */ - private byte[] serializeMetadata() - throws IOException { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - - dataOutputStream.writeInt(_metadata.size()); - - for (Map.Entry<String, String> entry : _metadata.entrySet()) { - MetadataKey key = MetadataKey.getByName(entry.getKey()); - // Ignore unknown keys. - if (key == null) { - continue; - } - String value = entry.getValue(); - dataOutputStream.writeInt(key.getId()); - if (key.getValueType() == MetadataValueType.INT) { - dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value))); - } else if (key.getValueType() == MetadataValueType.LONG) { - dataOutputStream.write(Longs.toByteArray(Long.parseLong(value))); - } else { - byte[] valueBytes = value.getBytes(UTF_8); - dataOutputStream.writeInt(valueBytes.length); - dataOutputStream.write(valueBytes); - } - } - - return byteArrayOutputStream.toByteArray(); - } - - /** - * Even though the wire format of V3 uses UTF-8 for string/bytes and big-endian for numeric values, - * the in-memory representation is STRING based for processing the metadata before serialization - * (by the server as it adds the statistics in metadata) and after deserialization (by the broker as it receives - * DataTable from each server and aggregates the values). - * This is to make V3 implementation keep the consumers of Map<String, String> getMetadata() API in the code happy - * by internally converting it. - * - * This method use relative operations on the ByteBuffer and expects the buffer's position to be set correctly. - */ - private Map<String, String> deserializeMetadata(ByteBuffer buffer) - throws IOException { - int numEntries = buffer.getInt(); - Map<String, String> metadata = new HashMap<>(); - for (int i = 0; i < numEntries; i++) { - int keyId = buffer.getInt(); - MetadataKey key = MetadataKey.getById(keyId); - // Ignore unknown keys. - if (key == null) { - continue; - } - if (key.getValueType() == MetadataValueType.INT) { - String value = "" + buffer.getInt(); - metadata.put(key.getName(), value); - } else if (key.getValueType() == MetadataValueType.LONG) { - String value = "" + buffer.getLong(); - metadata.put(key.getName(), value); - } else { - String value = DataTableUtils.decodeString(buffer); - metadata.put(key.getName(), value); - } - } - return metadata; - } - - private byte[] serializeExceptions() - throws IOException { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - - dataOutputStream.writeInt(_errCodeToExceptionMap.size()); - - for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) { - int key = entry.getKey(); - String value = entry.getValue(); - byte[] valueBytes = value.getBytes(UTF_8); - dataOutputStream.writeInt(key); - dataOutputStream.writeInt(valueBytes.length); - dataOutputStream.write(valueBytes); - } - - return byteArrayOutputStream.toByteArray(); - } - - private Map<Integer, String> deserializeExceptions(ByteBuffer buffer) - throws IOException { - int numExceptions = buffer.getInt(); - Map<Integer, String> exceptions = new HashMap<>(HashUtil.getHashMapCapacity(numExceptions)); - for (int i = 0; i < numExceptions; i++) { - int errCode = buffer.getInt(); - String errMessage = DataTableUtils.decodeString(buffer); - exceptions.put(errCode, errMessage); - } - return exceptions; - } -} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java index b58669fd4b..a39a7d7e9f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java @@ -36,7 +36,6 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; -import org.apache.pinot.spi.annotations.InterfaceStability; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.BigDecimalUtils; import org.apache.pinot.spi.utils.ByteArray; @@ -46,9 +45,39 @@ import static java.nio.charset.StandardCharsets.UTF_8; /** - * Datatable V4 Implementation is a wrapper around the Row-based data block. + * Datatable V4 implementation. + * + * The layout of serialized V4 datatable looks like: + * +-----------------------------------------------+ + * | 13 integers of header: | + * | VERSION | + * | NUM_ROWS | + * | NUM_COLUMNS | + * | EXCEPTIONS SECTION START OFFSET | + * | EXCEPTIONS SECTION LENGTH | + * | DICTIONARY_MAP SECTION START OFFSET | + * | DICTIONARY_MAP SECTION LENGTH | + * | DATA_SCHEMA SECTION START OFFSET | + * | DATA_SCHEMA SECTION LENGTH | + * | FIXED_SIZE_DATA SECTION START OFFSET | + * | FIXED_SIZE_DATA SECTION LENGTH | + * | VARIABLE_SIZE_DATA SECTION START OFFSET | + * | VARIABLE_SIZE_DATA SECTION LENGTH | + * +-----------------------------------------------+ + * | EXCEPTIONS SECTION | + * +-----------------------------------------------+ + * | DICTIONARY_MAP SECTION | + * +-----------------------------------------------+ + * | DATA_SCHEMA SECTION | + * +-----------------------------------------------+ + * | FIXED_SIZE_DATA SECTION | + * +-----------------------------------------------+ + * | VARIABLE_SIZE_DATA SECTION | + * +-----------------------------------------------+ + * | METADATA LENGTH | + * | METADATA SECTION | + * +-----------------------------------------------+ */ -@InterfaceStability.Evolving public class DataTableImplV4 implements DataTable { protected static final int HEADER_SIZE = Integer.BYTES * 13; @@ -84,8 +113,8 @@ public class DataTableImplV4 implements DataTable { _errCodeToExceptionMap = new HashMap<>(); } - public DataTableImplV4(int numRows, DataSchema dataSchema, String[] stringDictionary, - byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) { + public DataTableImplV4(int numRows, DataSchema dataSchema, String[] stringDictionary, byte[] fixedSizeDataBytes, + byte[] variableSizeDataBytes) { _numRows = numRows; _dataSchema = dataSchema; _numColumns = dataSchema == null ? 0 : dataSchema.size(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java index a1ddd8d104..dc3b40bfd8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java @@ -39,6 +39,7 @@ public class DataTableUtils { * @return row size in bytes. */ public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets, int dataTableVersion) { + assert dataTableVersion == DataTableFactory.VERSION_4; int numColumns = columnOffsets.length; assert numColumns == dataSchema.size(); @@ -48,26 +49,13 @@ public class DataTableUtils { columnOffsets[i] = rowSizeInBytes; switch (storedColumnDataTypes[i]) { case INT: - rowSizeInBytes += 4; - break; - case LONG: - rowSizeInBytes += 8; - break; case FLOAT: - if (dataTableVersion >= DataTableFactory.VERSION_4) { - rowSizeInBytes += 4; - } else { - rowSizeInBytes += 8; - } - break; - case DOUBLE: - rowSizeInBytes += 8; - break; case STRING: + // For STRING, we store the dictionary id. rowSizeInBytes += 4; break; - // Object and array. (POSITION|LENGTH) default: + // This covers LONG, DOUBLE and variable length data types (POSITION|LENGTH). rowSizeInBytes += 8; break; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java index d4e93e7476..40c438d388 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java @@ -18,64 +18,35 @@ */ package org.apache.pinot.core.common.datatable; +import com.google.common.base.Preconditions; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTableFactory; -import org.apache.pinot.common.datatable.DataTableImplV2; -import org.apache.pinot.common.datatable.DataTableImplV3; import org.apache.pinot.common.datatable.DataTableImplV4; import org.apache.pinot.common.utils.DataSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DataTableBuilderFactory { private DataTableBuilderFactory() { } - private static final Logger LOGGER = LoggerFactory.getLogger(DataTableBuilderFactory.class); - public static final int DEFAULT_VERSION = DataTableFactory.VERSION_4; - private static int _version = DEFAULT_VERSION; - public static int getDataTableVersion() { - return _version; + return DEFAULT_VERSION; } public static void setDataTableVersion(int version) { - LOGGER.info("Setting DataTable version to: {}", version); - if (version != DataTableFactory.VERSION_2 && version != DataTableFactory.VERSION_3 - && version != DataTableFactory.VERSION_4) { - throw new IllegalArgumentException("Unsupported version: " + version); - } - _version = version; + Preconditions.checkArgument(version == DEFAULT_VERSION, "Unsupported version: " + version); } public static DataTableBuilder getDataTableBuilder(DataSchema dataSchema) { - switch (_version) { - case DataTableFactory.VERSION_2: - case DataTableFactory.VERSION_3: - return new DataTableBuilderV2V3(dataSchema, _version); - case DataTableFactory.VERSION_4: - return new DataTableBuilderV4(dataSchema); - default: - throw new IllegalStateException("Unsupported data table version: " + _version); - } + return new DataTableBuilderV4(dataSchema); } /** * Returns an empty data table without data. */ public static DataTable getEmptyDataTable() { - switch (_version) { - case DataTableFactory.VERSION_2: - return new DataTableImplV2(); - case DataTableFactory.VERSION_3: - return new DataTableImplV3(); - case DataTableFactory.VERSION_4: - return new DataTableImplV4(); - default: - throw new IllegalStateException("Unsupported data table version: " + _version); - } + return new DataTableImplV4(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java deleted file mode 100644 index 375b1b4ad1..0000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.common.datatable; - -import com.google.common.base.Preconditions; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.pinot.common.datatable.DataTable; -import org.apache.pinot.common.datatable.DataTableFactory; -import org.apache.pinot.common.datatable.DataTableImplV2; -import org.apache.pinot.common.datatable.DataTableImplV3; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.spi.utils.ByteArray; -import org.roaringbitmap.RoaringBitmap; - - -/** - * Kept for backward compatible. Things improved in the newer versions: - * - Float size (should be 4 instead of 8) - * - Store bytes as variable size data instead of String - * - Use one dictionary for all columns (save space) - * - Support setting nullRowIds - */ -public class DataTableBuilderV2V3 extends BaseDataTableBuilder { - private final Map<String, Map<String, Integer>> _dictionaryMap = new HashMap<>(); - private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new HashMap<>(); - - public DataTableBuilderV2V3(DataSchema dataSchema, int version) { - super(dataSchema, version); - Preconditions.checkArgument(version <= DataTableFactory.VERSION_3); - } - - @Override - public void setColumn(int colId, String value) { - String columnName = _dataSchema.getColumnName(colId); - Map<String, Integer> dictionary = _dictionaryMap.get(columnName); - if (dictionary == null) { - dictionary = new HashMap<>(); - _dictionaryMap.put(columnName, dictionary); - _reverseDictionaryMap.put(columnName, new HashMap<>()); - } - - _currentRowDataByteBuffer.position(_columnOffsets[colId]); - Integer dictId = dictionary.get(value); - if (dictId == null) { - dictId = dictionary.size(); - dictionary.put(value, dictId); - _reverseDictionaryMap.get(columnName).put(dictId, value); - } - _currentRowDataByteBuffer.putInt(dictId); - } - - @Override - public void setColumn(int colId, ByteArray value) - throws IOException { - setColumn(colId, value.toHexString()); - } - - @Override - public void setColumn(int colId, String[] values) - throws IOException { - _currentRowDataByteBuffer.position(_columnOffsets[colId]); - _currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size()); - _currentRowDataByteBuffer.putInt(values.length); - - String columnName = _dataSchema.getColumnName(colId); - Map<String, Integer> dictionary = _dictionaryMap.get(columnName); - if (dictionary == null) { - dictionary = new HashMap<>(); - _dictionaryMap.put(columnName, dictionary); - _reverseDictionaryMap.put(columnName, new HashMap<>()); - } - - for (String value : values) { - Integer dictId = dictionary.get(value); - if (dictId == null) { - dictId = dictionary.size(); - dictionary.put(value, dictId); - _reverseDictionaryMap.get(columnName).put(dictId, value); - } - _variableSizeDataOutputStream.writeInt(dictId); - } - } - - @Override - public void setNullRowIds(@Nullable RoaringBitmap nullRowIds) - throws IOException { - throw new UnsupportedOperationException("Not supported before DataTable V4"); - } - - @Override - public DataTable build() { - byte[] fixedSizeDataBytes = _fixedSizeDataByteArrayOutputStream.toByteArray(); - byte[] variableSizeDataBytes = _variableSizeDataByteArrayOutputStream.toByteArray(); - if (_version == DataTableFactory.VERSION_2) { - return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap, fixedSizeDataBytes, - variableSizeDataBytes); - } else { - return new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap, fixedSizeDataBytes, - variableSizeDataBytes); - } - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java index d4ce7857a5..87586fe154 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java @@ -18,16 +18,13 @@ */ package org.apache.pinot.core.query.request; -import com.google.common.base.Preconditions; import java.util.List; import java.util.Map; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.PinotQuery; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.TimerContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; @@ -111,12 +108,7 @@ public class ServerQueryRequest { } private static QueryContext getQueryContext(PinotQuery pinotQuery) { - QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); - if (queryContext.isNullHandlingEnabled()) { - Preconditions.checkState(DataTableBuilderFactory.getDataTableVersion() >= DataTableFactory.VERSION_4, - "Null handling cannot be enabled for data table version smaller than 4"); - } - return queryContext; + return QueryContextConverterUtils.getQueryContext(pinotQuery); } public long getRequestId() { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java index 91f1df4fa7..a539f8d542 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java @@ -18,16 +18,9 @@ */ package org.apache.pinot.core.common.datatable; -import com.google.common.collect.ImmutableMap; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; import java.math.BigDecimal; -import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Map; import java.util.Random; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -46,8 +39,6 @@ import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * Unit test for {@link DataTable} serialization/de-serialization. @@ -77,30 +68,6 @@ public class DataTableSerDeTest { private static final int[][] BOOLEAN_ARRAYS = new int[NUM_ROWS][]; private static final long[][] TIMESTAMP_ARRAYS = new long[NUM_ROWS][]; private static final String[][] STRING_ARRAYS = new String[NUM_ROWS][]; - private static final Map<String, String> EXPECTED_METADATA = - ImmutableMap.<String, String>builder().put(MetadataKey.NUM_DOCS_SCANNED.getName(), String.valueOf(20L)) - .put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), String.valueOf(5L)) - .put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), String.valueOf(7L)) - .put(MetadataKey.NUM_SEGMENTS_QUERIED.getName(), String.valueOf(6)) - .put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), String.valueOf(6)) - .put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), String.valueOf(1)) - .put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), String.valueOf(1)) - .put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), String.valueOf(100L)) - .put(MetadataKey.TOTAL_DOCS.getName(), String.valueOf(200L)) - .put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true") - .put(MetadataKey.TIME_USED_MS.getName(), String.valueOf(20000L)).put(MetadataKey.TRACE_INFO.getName(), - "StudentException: Error finding students\n" - + " at StudentManager.findStudents(StudentManager.java:13)\n" - + " at StudentProgram.main(StudentProgram.java:9)\n" - + "Caused by: DAOException: Error querying students from database\n" - + " at StudentDAO.list(StudentDAO.java:11)\n" - + " at StudentManager.findStudents(StudentManager.java:11)\n" + " ... 1 more\n" - + "Caused by: java.sql.SQLException: Syntax Error\n" - + " at DatabaseUtils.executeQuery(DatabaseUtils.java:5)\n" - + " at StudentDAO.list(StudentDAO.java:8)\n" + " ... 2 more") - .put(MetadataKey.REQUEST_ID.getName(), String.valueOf(90181881818L)) - .put(MetadataKey.NUM_RESIZES.getName(), String.valueOf(900L)) - .put(MetadataKey.RESIZE_TIME_MS.getName(), String.valueOf(1919199L)).build(); @Test(dataProvider = "versionProvider") public void testException(int dataTableVersion) @@ -119,7 +86,6 @@ public class DataTableSerDeTest { String actual = newDataTable.getExceptions().get(QueryException.QUERY_EXECUTION_ERROR.getErrorCode()); Assert.assertEquals(actual, expected); - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); } @Test(dataProvider = "versionProvider") @@ -146,15 +112,13 @@ public class DataTableSerDeTest { new DataSchema(new String[]{"BYTES"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BYTES}), numRows, new Object[]{emptyBytes}); - testEmptyValues( - new DataSchema(new String[]{"BOOL_ARR"}, - new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}), - numRows, new Object[]{new int[]{}}); + testEmptyValues(new DataSchema(new String[]{"BOOL_ARR"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}), numRows, + new Object[]{new int[]{}}); - testEmptyValues( - new DataSchema(new String[]{"BOOL_ARR"}, - new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}), - numRows, new Object[]{new int[]{0}}); + testEmptyValues(new DataSchema(new String[]{"BOOL_ARR"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN_ARRAY}), numRows, + new Object[]{new int[]{0}}); testEmptyValues( new DataSchema(new String[]{"INT_ARR"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT_ARRAY}), @@ -186,7 +150,6 @@ public class DataTableSerDeTest { new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE_ARRAY}), numRows, new Object[]{new double[]{0}}); } - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); } private void testEmptyValues(DataSchema dataSchema, int numRows, Object[] emptyValues) @@ -278,7 +241,6 @@ public class DataTableSerDeTest { Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); } @Test(dataProvider = "versionProvider") @@ -301,7 +263,6 @@ public class DataTableSerDeTest { verifyDataIsSame(newDataTable, columnDataType, 1, numRows); } } - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); } @Test(dataProvider = "versionProvider") @@ -338,346 +299,6 @@ public class DataTableSerDeTest { Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName())); Assert.assertTrue( Integer.parseInt(newDataTable.getMetadata().get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())) > 0); - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); - } - - @Test - public void testV3V4Compatibility() - throws IOException { - DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values(); - int numColumns = columnDataTypes.length; - String[] columnNames = new String[numColumns]; - for (int i = 0; i < numColumns; i++) { - columnNames[i] = columnDataTypes[i].name(); - } - - DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); - - // TODO: verify data table compatibility across multi-stage and normal query engine. - // TODO: see https://github.com/apache/pinot/pull/8874/files#r894806085 - - // Verify V4 broker can deserialize data table (has data, but has no metadata) send by V3 server - ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false); - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3); - DataTableBuilder dataTableBuilderV3WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns); - - DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table - DataTable newDataTable = - DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata().size(), 0); - - // Verify V4 broker can deserialize data table (has data and metadata) send by V3 server - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V4 broker can deserialize data table (only has metadata) send by V3 server - DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement - // disabled) - ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false); - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); - DataTableBuilder dataTableBuilderV4WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, columnDataTypes, numColumns); - DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table - // Deserialize data table bytes as V4 - newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata().size(), 0); - - // Verify V4 broker can deserialize data table (has data and metadata) send by V4 server(with - // ThreadCpuTimeMeasurement disabled) - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - // Deserialize data table bytes as V4 - newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V4 broker can deserialize data table (only has metadata) send by V4 server(with - // ThreadCpuTimeMeasurement disabled) - DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a V4 data table - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement - // enabled) - ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true); - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); - dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table - // Deserialize data table bytes as V4 - newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata().size(), 1); - Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())); - - // Verify V4 broker can deserialize data table (has data and metadata) send by V4 server(with - // ThreadCpuTimeMeasurement enabled) - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - // Deserialize data table bytes as V4 - newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) { - Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1); - newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()); - } - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V4 broker can deserialize data table (only has metadata) send by V4 server(with - // ThreadCpuTimeMeasurement enabled) - dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a V4 data table - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); - if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) { - Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1); - newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()); - } - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - } - - @Test - public void testV2V3Compatibility() - throws IOException { - DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values(); - int numColumns = columnDataTypes.length; - String[] columnNames = new String[numColumns]; - for (int i = 0; i < numColumns; i++) { - columnNames[i] = columnDataTypes[i].name(); - } - - DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); - - // Verify V3 broker can deserialize data table (has data, but has no metadata) send by V2 server - ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false); - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_2); - DataTableBuilder dataTableBuilderV2WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly, columnDataTypes, numColumns); - - DataTable dataTableV2 = dataTableBuilderV2WithDataOnly.build(); // create a V2 data table - DataTable newDataTable = - DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker deserialize data table bytes as V2 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata().size(), 0); - - // Verify V3 broker can deserialize data table (has data and metadata) send by V2 server - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker deserialize data table bytes as V2 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V3 broker can deserialize data table (only has metadata) send by V2 server - DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - dataTableV2 = dataTableBuilderV2WithMetadataDataOnly.build(); // create a V2 data table - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker deserialize data table bytes as V2 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement - // disabled) - ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false); - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3); - DataTableBuilder dataTableBuilderV3WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns); - DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table - // Deserialize data table bytes as V3 - newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata().size(), 0); - - // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server(with - // ThreadCpuTimeMeasurement disabled) - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - // Deserialize data table bytes as V3 - newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V3 broker can deserialize data table (only has metadata) send by V3 server(with - // ThreadCpuTimeMeasurement disabled) - DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement - // enabled) - ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true); - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3); - dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table - // Deserialize data table bytes as V3 - newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - Assert.assertEquals(newDataTable.getMetadata().size(), 1); - Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())); - - // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server(with - // ThreadCpuTimeMeasurement enabled) - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - // Deserialize data table bytes as V3 - newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); - verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) { - Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1); - newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()); - } - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - - // Verify V3 broker can deserialize data table (only has metadata) send by V3 server(with - // ThreadCpuTimeMeasurement enabled) - dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table - for (String key : EXPECTED_METADATA.keySet()) { - dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 - Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); - Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); - if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) { - Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1); - newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()); - } - Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - } - - @Test - public void testDataTableVer3MetadataBytesLayout() - throws IOException { - DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values(); - int numColumns = columnDataTypes.length; - String[] columnNames = new String[numColumns]; - for (int i = 0; i < numColumns; i++) { - columnNames[i] = columnDataTypes[i].name(); - } - - ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false); - DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3); - DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); - fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns); - - DataTable dataTable = dataTableBuilder.build(); - - for (String key : EXPECTED_METADATA.keySet()) { - dataTable.getMetadata().put(key, EXPECTED_METADATA.get(key)); - } - - ByteBuffer byteBuffer = ByteBuffer.wrap(dataTable.toBytes()); - int version = byteBuffer.getInt(); - Assert.assertEquals(version, DataTableFactory.VERSION_3); - byteBuffer.getInt(); // numOfRows - byteBuffer.getInt(); // numOfColumns - byteBuffer.getInt(); // exceptionsStart - byteBuffer.getInt(); // exceptionsLength - byteBuffer.getInt(); // dictionaryMapStart - byteBuffer.getInt(); // dictionaryMapLength - byteBuffer.getInt(); // dataSchemaStart - byteBuffer.getInt(); // dataSchemaLength - byteBuffer.getInt(); // fixedSizeDataStart - byteBuffer.getInt(); // fixedSizeDataLength - int variableSizeDataStart = byteBuffer.getInt(); - int variableSizeDataLength = byteBuffer.getInt(); - - int metadataStart = variableSizeDataStart + variableSizeDataLength; - byteBuffer.position(metadataStart); - int metadataLength = byteBuffer.getInt(); - byte[] metadataBytes = new byte[metadataLength]; - byteBuffer.get(metadataBytes); - - try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(metadataBytes); - DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { - int numEntries = dataInputStream.readInt(); - // DataTable V3 and V4 serialization logic will add an extra RESPONSE_SER_CPU_TIME_NS KV pair into metadata - Assert.assertEquals(numEntries, EXPECTED_METADATA.size()); - for (int i = 0; i < numEntries; i++) { - int keyOrdinal = dataInputStream.readInt(); - DataTable.MetadataKey key = MetadataKey.getById(keyOrdinal); - Assert.assertNotEquals(key, null); - if (key.getValueType() == DataTable.MetadataValueType.INT) { - byte[] actualBytes = new byte[Integer.BYTES]; - dataInputStream.read(actualBytes); - Assert.assertEquals(actualBytes, Ints.toByteArray(Integer.parseInt(EXPECTED_METADATA.get(key.getName())))); - } else if (key.getValueType() == DataTable.MetadataValueType.LONG) { - byte[] actualBytes = new byte[Long.BYTES]; - dataInputStream.read(actualBytes); - // Ignore the THREAD_CPU_TIME_NS/SYSTEM_ACTIVITIES_CPU_TIME_NS/RESPONSE_SER_CPU_TIME_NS key since their value - // are evaluated during query execution. - if (key != MetadataKey.THREAD_CPU_TIME_NS && key != MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS - && key != MetadataKey.RESPONSE_SER_CPU_TIME_NS) { - Assert.assertEquals(actualBytes, Longs.toByteArray(Long.parseLong(EXPECTED_METADATA.get(key.getName())))); - } - } else { - int valueLength = dataInputStream.readInt(); - byte[] actualBytes = new byte[valueLength]; - dataInputStream.read(actualBytes); - Assert.assertEquals(actualBytes, EXPECTED_METADATA.get(key.getName()).getBytes(UTF_8)); - } - } - } } private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder, @@ -689,18 +310,15 @@ public class DataTableSerDeTest { private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder, DataSchema.ColumnDataType[] columnDataTypes, int numColumns, int numRows) throws IOException { - RoaringBitmap[] nullBitmaps = null; - if (DataTableBuilderFactory.getDataTableVersion() >= DataTableFactory.VERSION_4) { - nullBitmaps = new RoaringBitmap[numColumns]; - for (int colId = 0; colId < numColumns; colId++) { - nullBitmaps[colId] = new RoaringBitmap(); - } + RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns]; + for (int colId = 0; colId < numColumns; colId++) { + nullBitmaps[colId] = new RoaringBitmap(); } for (int rowId = 0; rowId < numRows; rowId++) { dataTableBuilder.startRow(); for (int colId = 0; colId < numColumns; colId++) { // Note: isNull is handled for SV columns only for now. - boolean isNull = nullBitmaps != null && RANDOM.nextFloat() < 0.1; + boolean isNull = RANDOM.nextFloat() < 0.1; if (isNull) { nullBitmaps[colId].add(rowId); } @@ -931,8 +549,7 @@ public class DataTableSerDeTest { @DataProvider(name = "versionProvider") public Object[][] provideVersion() { return new Object[][]{ - new Object[]{DataTableFactory.VERSION_4}, - new Object[]{DataTableFactory.VERSION_3}, + new Object[]{DataTableFactory.VERSION_4} }; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java index a2a720edfc..234ccfc0bf 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java @@ -30,7 +30,6 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; @@ -658,7 +657,6 @@ public class AllNullQueriesTest extends BaseQueriesTest { query.verify(columnDataType, brokerResponse); - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); _indexSegment.destroy(); FileUtils.deleteDirectory(indexDir); } diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java index 3d50c89484..572a6d5c0e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java @@ -32,7 +32,6 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; @@ -428,7 +427,6 @@ public class BigDecimalQueriesTest extends BaseQueriesTest { i++; } } - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); } @AfterClass diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java index 481634f116..bf9fe6e98f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java @@ -31,7 +31,6 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; @@ -447,7 +446,6 @@ public class BooleanNullEnabledQueriesTest extends BaseQueriesTest { assertEquals(thirdRow[0], (long) _nullValuesCount * 4); assertNull(thirdRow[1]); } - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); } @AfterClass diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java index 41a664c5f1..bba3aa4b53 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java @@ -32,7 +32,6 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; @@ -733,7 +732,6 @@ public class NullEnabledQueriesTest extends BaseQueriesTest { assertNull(rows.get(rows.size() - 1)[0]); } } - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); } @AfterClass diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java index 7a838052f9..cdf0af03bd 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java @@ -23,7 +23,6 @@ import java.io.File; import java.util.List; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -206,7 +205,6 @@ public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet { pinotQuery = "SELECT COUNT(1) FROM " + getTableName() + " option(enableNullHandling=true)"; h2Query = "SELECT COUNT(1) FROM " + getTableName(); testQuery(pinotQuery, h2Query); - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); } @Test(dataProvider = "nullLiteralQueries") diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 6fb23d96f6..48c703b653 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -50,7 +50,6 @@ import org.apache.hc.core5.http.message.BasicNameValuePair; import org.apache.helix.model.IdealState; import org.apache.pinot.client.PinotConnection; import org.apache.pinot.client.PinotDriver; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -59,7 +58,6 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.http.HttpClient; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; @@ -190,7 +188,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet @BeforeClass public void setUp() throws Exception { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3); TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); // Start the Pinot cluster diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java index 67e22a66d0..65f6906b68 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.response.broker.ResultTable; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.query.QueryEnvironmentTestBase; import org.apache.pinot.query.QueryServerEnclosure; import org.apache.pinot.query.mailbox.MailboxService; @@ -143,7 +142,6 @@ public class QueryRunnerTest extends QueryRunnerTestBase { @AfterClass public void tearDown() { - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); for (QueryServerEnclosure server : _servers.values()) { server.shutDown(); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java index 13e35af7ec..9ffce6810d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java @@ -41,7 +41,6 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.query.QueryEnvironmentTestBase; import org.apache.pinot.query.QueryServerEnclosure; import org.apache.pinot.query.mailbox.MailboxService; @@ -249,7 +248,6 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase { public void tearDown() { // Restore the original default timezone TimeZone.setDefault(_currentSystemTimeZone); - DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION); for (QueryServerEnclosure server : _servers.values()) { server.shutDown(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org