This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fb7ceb0 DataTable V3 implementation and measure data table
serialization cost on server (#6710)
fb7ceb0 is described below
commit fb7ceb0cb67c32ff2f7086b4bb687bbae6cd2fec
Author: Liang Mingqiang <[email protected]>
AuthorDate: Thu Apr 1 23:21:39 2021 -0700
DataTable V3 implementation and measure data table serialization cost on
server (#6710)
DataTable V3 move metadata section to the end of bytes when
serialization and use enum values (instead of String in V2) as key.
This change will be backward incompat if servers are upgraded first,
so brokers must be upgraded before servers. The compatibility of
the protocols will not be retained beyond 0.8.0 (or the next
version that is released)
---
.../apache/pinot/common/utils/CommonConstants.java | 3 +
.../org/apache/pinot/common/utils/DataTable.java | 78 +++-
.../{DataTableImplV2.java => BaseDataTable.java} | 308 +++-------------
.../core/common/datatable/DataTableBuilder.java | 20 +-
.../core/common/datatable/DataTableFactory.java | 7 +-
.../core/common/datatable/DataTableImplV2.java | 281 +--------------
.../core/common/datatable/DataTableImplV3.java | 398 +++++++++++++++++++++
.../core/common/datatable/DataTableUtils.java | 45 +++
.../core/operator/InstanceResponseOperator.java | 4 +-
.../operator/blocks/IntermediateResultsBlock.java | 67 ++--
.../query/executor/ServerQueryExecutorV1Impl.java | 11 +-
.../core/query/reduce/BrokerReduceService.java | 7 +-
.../pinot/core/query/scheduler/QueryScheduler.java | 73 ++--
.../core/transport/InstanceRequestHandler.java | 11 +-
.../core/common/datatable/DataTableSerDeTest.java | 302 +++++++++++++++-
.../query/scheduler/PrioritySchedulerTest.java | 33 +-
.../pinot/core/transport/QueryRoutingTest.java | 8 +-
.../tests/OfflineClusterIntegrationTest.java | 5 +-
.../server/starter/helix/HelixServerStarter.java | 6 +
19 files changed, 1017 insertions(+), 650 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 8ed12fe..6a7f97f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -339,6 +339,9 @@ public class CommonConstants {
public static final String CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT =
"pinot.server.instance.enableThreadCpuTimeMeasurement";
public static final boolean DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT =
false;
+
+ public static final String CONFIG_OF_CURRENT_DATA_TABLE_VERSION =
"pinot.server.instance.currentDataTableVersion";
+ public static final int DEFAULT_CURRENT_DATA_TABLE_VERSION = 3;
}
public static class Controller {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 7e82e56..b699b02 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -19,7 +19,9 @@
package org.apache.pinot.common.utils;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.spi.utils.ByteArray;
@@ -44,10 +46,11 @@ public interface DataTable {
String REQUEST_ID_METADATA_KEY = "requestId";
String NUM_RESIZES_METADATA_KEY = "numResizes";
String RESIZE_TIME_MS_METADATA_KEY = "resizeTimeMs";
- String EXECUTION_THREAD_CPU_TIME_NS_METADATA_KEY =
"executionThreadCpuTimeNs";
void addException(ProcessingException processingException);
+ Map<Integer, String> getExceptions();
+
byte[] toBytes()
throws IOException;
@@ -80,4 +83,77 @@ public interface DataTable {
double[] getDoubleArray(int rowId, int colId);
String[] getStringArray(int rowId, int colId);
+
+ enum MetadataValueType {
+ INT, LONG, STRING
+ }
+
+ /* The MetadataKey is used in V3, where we present metadata as
Map<MetadataKey, String>
+ * ATTENTION:
+ * - Don't change existing keys.
+ * - Don't remove existing keys.
+ * - Always add new keys to the end.
+ * Otherwise, backward compatibility will be broken.
+ */
+ enum MetadataKey {
+ UNKNOWN("unknown", MetadataValueType.STRING),
+ TABLE("table", MetadataValueType.STRING), // NOTE: this key is only used
in PrioritySchedulerTest
+ NUM_DOCS_SCANNED("numDocsScanned", MetadataValueType.LONG),
+ NUM_ENTRIES_SCANNED_IN_FILTER("numEntriesScannedInFilter",
MetadataValueType.LONG),
+ NUM_ENTRIES_SCANNED_POST_FILTER("numEntriesScannedPostFilter",
MetadataValueType.LONG),
+ NUM_SEGMENTS_QUERIED("numSegmentsQueried", MetadataValueType.INT),
+ NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", MetadataValueType.INT),
+ NUM_SEGMENTS_MATCHED("numSegmentsMatched", MetadataValueType.INT),
+ NUM_CONSUMING_SEGMENTS_PROCESSED("numConsumingSegmentsProcessed",
MetadataValueType.INT),
+ MIN_CONSUMING_FRESHNESS_TIME_MS("minConsumingFreshnessTimeMs",
MetadataValueType.LONG),
+ TOTAL_DOCS("totalDocs", MetadataValueType.LONG),
+ NUM_GROUPS_LIMIT_REACHED("numGroupsLimitReached",
MetadataValueType.STRING),
+ TIME_USED_MS("timeUsedMs", MetadataValueType.LONG),
+ TRACE_INFO("traceInfo", MetadataValueType.STRING),
+ REQUEST_ID("requestId", MetadataValueType.LONG),
+ NUM_RESIZES("numResizes", MetadataValueType.INT),
+ RESIZE_TIME_MS("resizeTimeMs", MetadataValueType.LONG),
+ THREAD_CPU_TIME_NS("threadCpuTimeNs", MetadataValueType.LONG);
+
+ private static final Map<String, MetadataKey> _nameToEnumKeyMap = new
HashMap<>();
+ private final String _name;
+ private final MetadataValueType _valueType;
+
+ MetadataKey(String name, MetadataValueType valueType) {
+ _name = name;
+ _valueType = valueType;
+ }
+
+ // getByOrdinal returns an enum key for a given ordinal or null if the key
does not exist.
+ @Nullable
+ public static MetadataKey getByOrdinal(int ordinal) {
+ if (ordinal >= MetadataKey.values().length) {
+ return null;
+ }
+ return MetadataKey.values()[ordinal];
+ }
+
+ // getByName returns an enum key for a given name or null if the key does
not exist.
+ public static MetadataKey getByName(String name) {
+ return _nameToEnumKeyMap.getOrDefault(name, null);
+ }
+
+ // getName returns the associated name(string) of the enum key.
+ public String getName() {
+ return _name;
+ }
+
+ // getValueType returns the value type(int/long/String) of the enum key.
+ public MetadataValueType getValueType() {
+ return _valueType;
+ }
+
+ static {
+ for (MetadataKey key : MetadataKey.values()) {
+ if (_nameToEnumKeyMap.put(key.getName(), key) != null) {
+ throw new IllegalArgumentException("Duplicate name defined in the
MetadataKey definition: " + key.getName());
+ }
+ }
+ }
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
similarity index 50%
copy from
pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index ac3d060..8b4dfbc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -26,9 +26,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.StringUtil;
@@ -36,36 +33,26 @@ import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
+import static
org.apache.pinot.core.common.datatable.DataTableUtils.decodeString;
-public class DataTableImplV2 implements DataTable {
- private static final int VERSION = 2;
-
- // VERSION
- // NUM_ROWS
- // NUM_COLUMNS
- // DICTIONARY_MAP (START|SIZE)
- // METADATA (START|SIZE)
- // DATA_SCHEMA (START|SIZE)
- // FIXED_SIZE_DATA (START|SIZE)
- // VARIABLE_SIZE_DATA (START|SIZE)
- private static final int HEADER_SIZE = Integer.BYTES * 13;
-
- private final int _numRows;
- private final int _numColumns;
- private final DataSchema _dataSchema;
- private final int[] _columnOffsets;
- private final int _rowSizeInBytes;
- private final Map<String, Map<Integer, String>> _dictionaryMap;
- private final byte[] _fixedSizeDataBytes;
- private final ByteBuffer _fixedSizeData;
- private final byte[] _variableSizeDataBytes;
- private final ByteBuffer _variableSizeData;
- private final Map<String, String> _metadata;
- /**
- * Construct data table with results. (Server side)
- */
- public DataTableImplV2(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
+/**
+ * Base implementation of the DataTable interface.
+ */
+public abstract class BaseDataTable implements DataTable {
+ protected int _numRows;
+ protected int _numColumns;
+ protected DataSchema _dataSchema;
+ protected int[] _columnOffsets;
+ protected int _rowSizeInBytes;
+ protected Map<String, Map<Integer, String>> _dictionaryMap;
+ protected byte[] _fixedSizeDataBytes;
+ protected ByteBuffer _fixedSizeData;
+ protected byte[] _variableSizeDataBytes;
+ protected ByteBuffer _variableSizeData;
+ protected Map<String, String> _metadata;
+
+ public BaseDataTable(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
_numRows = numRows;
_numColumns = dataSchema.size();
@@ -83,7 +70,7 @@ public class DataTableImplV2 implements DataTable {
/**
* Construct empty data table. (Server side)
*/
- public DataTableImplV2() {
+ public BaseDataTable() {
_numRows = 0;
_numColumns = 0;
_dataSchema = null;
@@ -98,78 +85,37 @@ public class DataTableImplV2 implements DataTable {
}
/**
- * Construct data table from byte array. (broker side)
+ * Helper method to serialize dictionary map.
*/
- public DataTableImplV2(ByteBuffer byteBuffer)
+ protected byte[] serializeDictionaryMap()
throws IOException {
- // Read header.
- _numRows = byteBuffer.getInt();
- _numColumns = byteBuffer.getInt();
- int dictionaryMapStart = byteBuffer.getInt();
- int dictionaryMapLength = byteBuffer.getInt();
- int metadataStart = byteBuffer.getInt();
- int metadataLength = byteBuffer.getInt();
- int dataSchemaStart = byteBuffer.getInt();
- int dataSchemaLength = byteBuffer.getInt();
- int fixedSizeDataStart = byteBuffer.getInt();
- int fixedSizeDataLength = byteBuffer.getInt();
- int variableSizeDataStart = byteBuffer.getInt();
- int variableSizeDataLength = byteBuffer.getInt();
-
- // Read dictionary.
- if (dictionaryMapLength != 0) {
- byte[] dictionaryMapBytes = new byte[dictionaryMapLength];
- byteBuffer.position(dictionaryMapStart);
- byteBuffer.get(dictionaryMapBytes);
- _dictionaryMap = deserializeDictionaryMap(dictionaryMapBytes);
- } else {
- _dictionaryMap = null;
- }
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- // Read metadata.
- byte[] metadataBytes = new byte[metadataLength];
- byteBuffer.position(metadataStart);
- byteBuffer.get(metadataBytes);
- _metadata = deserializeMetadata(metadataBytes);
-
- // Read data schema.
- if (dataSchemaLength != 0) {
- byte[] schemaBytes = new byte[dataSchemaLength];
- byteBuffer.position(dataSchemaStart);
- byteBuffer.get(schemaBytes);
- _dataSchema = DataSchema.fromBytes(schemaBytes);
- _columnOffsets = new int[_dataSchema.size()];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema,
_columnOffsets);
- } else {
- _dataSchema = null;
- _columnOffsets = null;
- _rowSizeInBytes = 0;
- }
+ dataOutputStream.writeInt(_dictionaryMap.size());
+ for (Map.Entry<String, Map<Integer, String>> dictionaryMapEntry :
_dictionaryMap.entrySet()) {
+ String columnName = dictionaryMapEntry.getKey();
+ Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
+ byte[] bytes = StringUtil.encodeUtf8(columnName);
+ dataOutputStream.writeInt(bytes.length);
+ dataOutputStream.write(bytes);
+ dataOutputStream.writeInt(dictionary.size());
- // Read fixed size data.
- if (fixedSizeDataLength != 0) {
- _fixedSizeDataBytes = new byte[fixedSizeDataLength];
- byteBuffer.position(fixedSizeDataStart);
- byteBuffer.get(_fixedSizeDataBytes);
- _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes);
- } else {
- _fixedSizeDataBytes = null;
- _fixedSizeData = null;
+ for (Map.Entry<Integer, String> dictionaryEntry : dictionary.entrySet())
{
+ dataOutputStream.writeInt(dictionaryEntry.getKey());
+ byte[] valueBytes = StringUtil.encodeUtf8(dictionaryEntry.getValue());
+ dataOutputStream.writeInt(valueBytes.length);
+ dataOutputStream.write(valueBytes);
+ }
}
- // Read variable size data.
- if (variableSizeDataLength != 0) {
- _variableSizeDataBytes = new byte[variableSizeDataLength];
- byteBuffer.position(variableSizeDataStart);
- byteBuffer.get(_variableSizeDataBytes);
- _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
- } else {
- _variableSizeDataBytes = null;
- _variableSizeData = null;
- }
+ return byteArrayOutputStream.toByteArray();
}
- private Map<String, Map<Integer, String>> deserializeDictionaryMap(byte[]
bytes)
+ /**
+ * Helper method to deserialize dictionary map.
+ */
+ protected Map<String, Map<Integer, String>> deserializeDictionaryMap(byte[]
bytes)
throws IOException {
try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
@@ -192,211 +138,49 @@ public class DataTableImplV2 implements DataTable {
}
}
- private Map<String, String> deserializeMetadata(byte[] bytes)
- throws IOException {
- try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
- DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
- int numEntries = dataInputStream.readInt();
- Map<String, String> metadata = new HashMap<>(numEntries);
-
- for (int i = 0; i < numEntries; i++) {
- String key = decodeString(dataInputStream);
- String value = decodeString(dataInputStream);
- metadata.put(key, value);
- }
-
- return metadata;
- }
- }
-
- private static String decodeString(DataInputStream dataInputStream)
- throws IOException {
- int length = dataInputStream.readInt();
- if (length == 0) {
- return StringUtils.EMPTY;
- } else {
- byte[] buffer = new byte[length];
- int numBytesRead = dataInputStream.read(buffer);
- assert numBytesRead == length;
- return StringUtil.decodeUtf8(buffer);
- }
- }
-
- @Override
- public void addException(ProcessingException processingException) {
- _metadata.put(EXCEPTION_METADATA_KEY + processingException.getErrorCode(),
processingException.getMessage());
- }
-
- @Override
- public byte[] toBytes()
- throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- dataOutputStream.writeInt(VERSION);
- dataOutputStream.writeInt(_numRows);
- dataOutputStream.writeInt(_numColumns);
- int dataOffset = HEADER_SIZE;
-
- // Write dictionary.
- dataOutputStream.writeInt(dataOffset);
- byte[] dictionaryMapBytes = null;
- if (_dictionaryMap != null) {
- dictionaryMapBytes = serializeDictionaryMap();
- dataOutputStream.writeInt(dictionaryMapBytes.length);
- dataOffset += dictionaryMapBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write metadata.
- dataOutputStream.writeInt(dataOffset);
- byte[] metadataBytes = serializeMetadata();
- dataOutputStream.writeInt(metadataBytes.length);
- dataOffset += metadataBytes.length;
-
- // Write data schema.
- dataOutputStream.writeInt(dataOffset);
- byte[] dataSchemaBytes = null;
- if (_dataSchema != null) {
- dataSchemaBytes = _dataSchema.toBytes();
- dataOutputStream.writeInt(dataSchemaBytes.length);
- dataOffset += dataSchemaBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write fixed size data.
- dataOutputStream.writeInt(dataOffset);
- if (_fixedSizeDataBytes != null) {
- dataOutputStream.writeInt(_fixedSizeDataBytes.length);
- dataOffset += _fixedSizeDataBytes.length;
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write variable size data.
- dataOutputStream.writeInt(dataOffset);
- if (_variableSizeDataBytes != null) {
- dataOutputStream.writeInt(_variableSizeDataBytes.length);
- } else {
- dataOutputStream.writeInt(0);
- }
-
- // Write actual data.
- if (dictionaryMapBytes != null) {
- dataOutputStream.write(dictionaryMapBytes);
- }
- dataOutputStream.write(metadataBytes);
- if (dataSchemaBytes != null) {
- dataOutputStream.write(dataSchemaBytes);
- }
- if (_fixedSizeDataBytes != null) {
- dataOutputStream.write(_fixedSizeDataBytes);
- }
- if (_variableSizeDataBytes != null) {
- dataOutputStream.write(_variableSizeDataBytes);
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-
- private byte[] serializeDictionaryMap()
- throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
-
- dataOutputStream.writeInt(_dictionaryMap.size());
- for (Entry<String, Map<Integer, String>> dictionaryMapEntry :
_dictionaryMap.entrySet()) {
- String columnName = dictionaryMapEntry.getKey();
- Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
- byte[] bytes = StringUtil.encodeUtf8(columnName);
- dataOutputStream.writeInt(bytes.length);
- dataOutputStream.write(bytes);
- dataOutputStream.writeInt(dictionary.size());
-
- for (Entry<Integer, String> dictionaryEntry : dictionary.entrySet()) {
- dataOutputStream.writeInt(dictionaryEntry.getKey());
- byte[] valueBytes = StringUtil.encodeUtf8(dictionaryEntry.getValue());
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- }
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-
- private byte[] serializeMetadata()
- throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
-
- dataOutputStream.writeInt(_metadata.size());
- for (Entry<String, String> entry : _metadata.entrySet()) {
- byte[] keyBytes = StringUtil.encodeUtf8(entry.getKey());
- dataOutputStream.writeInt(keyBytes.length);
- dataOutputStream.write(keyBytes);
-
- byte[] valueBytes = StringUtil.encodeUtf8(entry.getValue());
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-
- @Override
public Map<String, String> getMetadata() {
return _metadata;
}
- @Override
public DataSchema getDataSchema() {
return _dataSchema;
}
- @Override
public int getNumberOfRows() {
return _numRows;
}
- @Override
public int getInt(int rowId, int colId) {
_fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
return _fixedSizeData.getInt();
}
- @Override
public long getLong(int rowId, int colId) {
_fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
return _fixedSizeData.getLong();
}
- @Override
public float getFloat(int rowId, int colId) {
_fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
return _fixedSizeData.getFloat();
}
- @Override
public double getDouble(int rowId, int colId) {
_fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
return _fixedSizeData.getDouble();
}
- @Override
public String getString(int rowId, int colId) {
_fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
int dictId = _fixedSizeData.getInt();
return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId);
}
- @Override
public ByteArray getBytes(int rowId, int colId) {
- // NOTE: DataTable V2 uses String to store BYTES value
+ // NOTE: DataTable V2/V3 uses String to store BYTES value
return BytesUtils.toByteArray(getString(rowId, colId));
}
- @Override
public <T> T getObject(int rowId, int colId) {
int size = positionCursorInVariableBuffer(rowId, colId);
int objectTypeValue = _variableSizeData.getInt();
@@ -405,7 +189,6 @@ public class DataTableImplV2 implements DataTable {
return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
}
- @Override
public int[] getIntArray(int rowId, int colId) {
int length = positionCursorInVariableBuffer(rowId, colId);
int[] ints = new int[length];
@@ -415,7 +198,6 @@ public class DataTableImplV2 implements DataTable {
return ints;
}
- @Override
public long[] getLongArray(int rowId, int colId) {
int length = positionCursorInVariableBuffer(rowId, colId);
long[] longs = new long[length];
@@ -425,7 +207,6 @@ public class DataTableImplV2 implements DataTable {
return longs;
}
- @Override
public float[] getFloatArray(int rowId, int colId) {
int length = positionCursorInVariableBuffer(rowId, colId);
float[] floats = new float[length];
@@ -435,7 +216,6 @@ public class DataTableImplV2 implements DataTable {
return floats;
}
- @Override
public double[] getDoubleArray(int rowId, int colId) {
int length = positionCursorInVariableBuffer(rowId, colId);
double[] doubles = new double[length];
@@ -445,7 +225,6 @@ public class DataTableImplV2 implements DataTable {
return doubles;
}
- @Override
public String[] getStringArray(int rowId, int colId) {
int length = positionCursorInVariableBuffer(rowId, colId);
String[] strings = new String[length];
@@ -462,7 +241,6 @@ public class DataTableImplV2 implements DataTable {
return _fixedSizeData.getInt();
}
- @Override
public String toString() {
if (_dataSchema == null) {
return _metadata.toString();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index cafafff..e0e6c4e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -77,6 +77,9 @@ import org.apache.pinot.spi.utils.ByteArray;
// TODO: 3. Given a data schema, write all values one by one instead of
using rowId and colId to position (save time).
// TODO: 4. Store bytes as variable size data instead of String
public class DataTableBuilder {
+ public static final int VERSION_2 = 2;
+ public static final int VERSION_3 = 3;
+ private static int _version = VERSION_3;
private final DataSchema _dataSchema;
private final int[] _columnOffsets;
private final int _rowSizeInBytes;
@@ -96,6 +99,17 @@ public class DataTableBuilder {
_rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema,
_columnOffsets);
}
+ public static DataTable getEmptyDataTable() {
+ return _version == VERSION_2 ? new DataTableImplV2() : new
DataTableImplV3();
+ }
+
+ public static void setCurrentDataTableVersion(int version) {
+ if (version != VERSION_2 && version != VERSION_3) {
+ throw new IllegalArgumentException("Unsupported version: " + version);
+ }
+ _version = version;
+ }
+
public void startRow() {
_numRows++;
_currentRowDataByteBuffer = ByteBuffer.allocate(_rowSizeInBytes);
@@ -263,7 +277,9 @@ public class DataTableBuilder {
}
public DataTable build() {
- return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap,
- _fixedSizeDataByteArrayOutputStream.toByteArray(),
_variableSizeDataByteArrayOutputStream.toByteArray());
+ return _version == VERSION_2 ? new DataTableImplV2(_numRows, _dataSchema,
_reverseDictionaryMap,
+ _fixedSizeDataByteArrayOutputStream.toByteArray(),
_variableSizeDataByteArrayOutputStream.toByteArray())
+ : new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap,
+ _fixedSizeDataByteArrayOutputStream.toByteArray(),
_variableSizeDataByteArrayOutputStream.toByteArray());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
index 8e212fa..e2a3385 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.pinot.common.utils.DataTable;
+import static
org.apache.pinot.core.common.datatable.DataTableBuilder.VERSION_2;
+import static
org.apache.pinot.core.common.datatable.DataTableBuilder.VERSION_3;
+
public class DataTableFactory {
private DataTableFactory() {
@@ -31,8 +34,10 @@ public class DataTableFactory {
throws IOException {
int version = byteBuffer.getInt();
switch (version) {
- case 2:
+ case VERSION_2:
return new DataTableImplV2(byteBuffer);
+ case VERSION_3:
+ return new DataTableImplV3(byteBuffer);
default:
throw new UnsupportedOperationException("Unsupported data table
version: " + version);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
index ac3d060..1764025 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
@@ -27,19 +27,15 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.spi.utils.ByteArray;
-import org.apache.pinot.spi.utils.BytesUtils;
+import static
org.apache.pinot.core.common.datatable.DataTableBuilder.VERSION_2;
+import static
org.apache.pinot.core.common.datatable.DataTableUtils.decodeString;
-public class DataTableImplV2 implements DataTable {
- private static final int VERSION = 2;
+public class DataTableImplV2 extends BaseDataTable {
// VERSION
// NUM_ROWS
// NUM_COLUMNS
@@ -50,51 +46,18 @@ public class DataTableImplV2 implements DataTable {
// VARIABLE_SIZE_DATA (START|SIZE)
private static final int HEADER_SIZE = Integer.BYTES * 13;
- private final int _numRows;
- private final int _numColumns;
- private final DataSchema _dataSchema;
- private final int[] _columnOffsets;
- private final int _rowSizeInBytes;
- private final Map<String, Map<Integer, String>> _dictionaryMap;
- private final byte[] _fixedSizeDataBytes;
- private final ByteBuffer _fixedSizeData;
- private final byte[] _variableSizeDataBytes;
- private final ByteBuffer _variableSizeData;
- private final Map<String, String> _metadata;
-
/**
* Construct data table with results. (Server side)
*/
public DataTableImplV2(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
- _numRows = numRows;
- _numColumns = dataSchema.size();
- _dataSchema = dataSchema;
- _columnOffsets = new int[_numColumns];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema,
_columnOffsets);
- _dictionaryMap = dictionaryMap;
- _fixedSizeDataBytes = fixedSizeDataBytes;
- _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
- _variableSizeDataBytes = variableSizeDataBytes;
- _variableSizeData = ByteBuffer.wrap(variableSizeDataBytes);
- _metadata = new HashMap<>();
+ super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes,
variableSizeDataBytes);
}
/**
* Construct empty data table. (Server side)
*/
public DataTableImplV2() {
- _numRows = 0;
- _numColumns = 0;
- _dataSchema = null;
- _columnOffsets = null;
- _rowSizeInBytes = 0;
- _dictionaryMap = null;
- _fixedSizeDataBytes = null;
- _fixedSizeData = null;
- _variableSizeDataBytes = null;
- _variableSizeData = null;
- _metadata = new HashMap<>();
}
/**
@@ -169,29 +132,6 @@ public class DataTableImplV2 implements DataTable {
}
}
- private Map<String, Map<Integer, String>> deserializeDictionaryMap(byte[]
bytes)
- throws IOException {
- try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
- DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
- int numDictionaries = dataInputStream.readInt();
- Map<String, Map<Integer, String>> dictionaryMap = new
HashMap<>(numDictionaries);
-
- for (int i = 0; i < numDictionaries; i++) {
- String column = decodeString(dataInputStream);
- int dictionarySize = dataInputStream.readInt();
- Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
- for (int j = 0; j < dictionarySize; j++) {
- int key = dataInputStream.readInt();
- String value = decodeString(dataInputStream);
- dictionary.put(key, value);
- }
- dictionaryMap.put(column, dictionary);
- }
-
- return dictionaryMap;
- }
- }
-
private Map<String, String> deserializeMetadata(byte[] bytes)
throws IOException {
try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
@@ -209,30 +149,31 @@ public class DataTableImplV2 implements DataTable {
}
}
- private static String decodeString(DataInputStream dataInputStream)
- throws IOException {
- int length = dataInputStream.readInt();
- if (length == 0) {
- return StringUtils.EMPTY;
- } else {
- byte[] buffer = new byte[length];
- int numBytesRead = dataInputStream.read(buffer);
- assert numBytesRead == length;
- return StringUtil.decodeUtf8(buffer);
- }
- }
-
@Override
public void addException(ProcessingException processingException) {
_metadata.put(EXCEPTION_METADATA_KEY + processingException.getErrorCode(),
processingException.getMessage());
}
+ // getExceptions return a map of errorCode->errMessage of the datatable.
+ @Override
+ public Map<Integer, String> getExceptions() {
+ Map<Integer, String> exceptions = new HashMap<>();
+ for (String key : _metadata.keySet()) {
+ if (key.startsWith(EXCEPTION_METADATA_KEY)) {
+ // In V2, all exceptions are added into metadata, using
"Exception"+errCode as key,
+ // Integer.parseInt(key.substring(9)) can extract the error code from
the key.
+ exceptions.put(Integer.parseInt(key.substring(9)), _metadata.get(key));
+ }
+ }
+ return exceptions;
+ }
+
@Override
public byte[] toBytes()
throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- dataOutputStream.writeInt(VERSION);
+ dataOutputStream.writeInt(VERSION_2);
dataOutputStream.writeInt(_numRows);
dataOutputStream.writeInt(_numColumns);
int dataOffset = HEADER_SIZE;
@@ -300,31 +241,6 @@ public class DataTableImplV2 implements DataTable {
return byteArrayOutputStream.toByteArray();
}
- private byte[] serializeDictionaryMap()
- throws IOException {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
-
- dataOutputStream.writeInt(_dictionaryMap.size());
- for (Entry<String, Map<Integer, String>> dictionaryMapEntry :
_dictionaryMap.entrySet()) {
- String columnName = dictionaryMapEntry.getKey();
- Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
- byte[] bytes = StringUtil.encodeUtf8(columnName);
- dataOutputStream.writeInt(bytes.length);
- dataOutputStream.write(bytes);
- dataOutputStream.writeInt(dictionary.size());
-
- for (Entry<Integer, String> dictionaryEntry : dictionary.entrySet()) {
- dataOutputStream.writeInt(dictionaryEntry.getKey());
- byte[] valueBytes = StringUtil.encodeUtf8(dictionaryEntry.getValue());
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- }
- }
-
- return byteArrayOutputStream.toByteArray();
- }
-
private byte[] serializeMetadata()
throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -343,163 +259,4 @@ public class DataTableImplV2 implements DataTable {
return byteArrayOutputStream.toByteArray();
}
-
- @Override
- public Map<String, String> getMetadata() {
- return _metadata;
- }
-
- @Override
- public DataSchema getDataSchema() {
- return _dataSchema;
- }
-
- @Override
- public int getNumberOfRows() {
- return _numRows;
- }
-
- @Override
- public int getInt(int rowId, int colId) {
- _fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
- return _fixedSizeData.getInt();
- }
-
- @Override
- public long getLong(int rowId, int colId) {
- _fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
- return _fixedSizeData.getLong();
- }
-
- @Override
- public float getFloat(int rowId, int colId) {
- _fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
- return _fixedSizeData.getFloat();
- }
-
- @Override
- public double getDouble(int rowId, int colId) {
- _fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
- return _fixedSizeData.getDouble();
- }
-
- @Override
- public String getString(int rowId, int colId) {
- _fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
- int dictId = _fixedSizeData.getInt();
- return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId);
- }
-
- @Override
- public ByteArray getBytes(int rowId, int colId) {
- // NOTE: DataTable V2 uses String to store BYTES value
- return BytesUtils.toByteArray(getString(rowId, colId));
- }
-
- @Override
- public <T> T getObject(int rowId, int colId) {
- int size = positionCursorInVariableBuffer(rowId, colId);
- int objectTypeValue = _variableSizeData.getInt();
- ByteBuffer byteBuffer = _variableSizeData.slice();
- byteBuffer.limit(size);
- return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue);
- }
-
- @Override
- public int[] getIntArray(int rowId, int colId) {
- int length = positionCursorInVariableBuffer(rowId, colId);
- int[] ints = new int[length];
- for (int i = 0; i < length; i++) {
- ints[i] = _variableSizeData.getInt();
- }
- return ints;
- }
-
- @Override
- public long[] getLongArray(int rowId, int colId) {
- int length = positionCursorInVariableBuffer(rowId, colId);
- long[] longs = new long[length];
- for (int i = 0; i < length; i++) {
- longs[i] = _variableSizeData.getLong();
- }
- return longs;
- }
-
- @Override
- public float[] getFloatArray(int rowId, int colId) {
- int length = positionCursorInVariableBuffer(rowId, colId);
- float[] floats = new float[length];
- for (int i = 0; i < length; i++) {
- floats[i] = _variableSizeData.getFloat();
- }
- return floats;
- }
-
- @Override
- public double[] getDoubleArray(int rowId, int colId) {
- int length = positionCursorInVariableBuffer(rowId, colId);
- double[] doubles = new double[length];
- for (int i = 0; i < length; i++) {
- doubles[i] = _variableSizeData.getDouble();
- }
- return doubles;
- }
-
- @Override
- public String[] getStringArray(int rowId, int colId) {
- int length = positionCursorInVariableBuffer(rowId, colId);
- String[] strings = new String[length];
- Map<Integer, String> dictionary =
_dictionaryMap.get(_dataSchema.getColumnName(colId));
- for (int i = 0; i < length; i++) {
- strings[i] = dictionary.get(_variableSizeData.getInt());
- }
- return strings;
- }
-
- private int positionCursorInVariableBuffer(int rowId, int colId) {
- _fixedSizeData.position(rowId * _rowSizeInBytes + _columnOffsets[colId]);
- _variableSizeData.position(_fixedSizeData.getInt());
- return _fixedSizeData.getInt();
- }
-
- @Override
- public String toString() {
- if (_dataSchema == null) {
- return _metadata.toString();
- }
-
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(_dataSchema.toString()).append('\n');
- stringBuilder.append("numRows: ").append(_numRows).append('\n');
-
- _fixedSizeData.position(0);
- for (int rowId = 0; rowId < _numRows; rowId++) {
- for (int colId = 0; colId < _numColumns; colId++) {
- switch (_dataSchema.getColumnDataType(colId)) {
- case INT:
- stringBuilder.append(_fixedSizeData.getInt());
- break;
- case LONG:
- stringBuilder.append(_fixedSizeData.getLong());
- break;
- case FLOAT:
- stringBuilder.append(_fixedSizeData.getFloat());
- break;
- case DOUBLE:
- stringBuilder.append(_fixedSizeData.getDouble());
- break;
- case STRING:
- stringBuilder.append(_fixedSizeData.getInt());
- break;
- // Object and array.
- default:
- stringBuilder.append(String.format("(%s:%s)",
_fixedSizeData.getInt(), _fixedSizeData.getInt()));
- break;
- }
- stringBuilder.append("\t");
- }
- stringBuilder.append("\n");
- }
- return stringBuilder.toString();
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
new file mode 100644
index 0000000..68b2134
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.common.datatable;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.query.request.context.ThreadTimer;
+
+import static
org.apache.pinot.common.utils.DataTable.MetadataKey.THREAD_CPU_TIME_NS;
+import static
org.apache.pinot.core.common.datatable.DataTableBuilder.VERSION_3;
+import static org.apache.pinot.core.common.datatable.DataTableUtils.decodeInt;
+import static org.apache.pinot.core.common.datatable.DataTableUtils.decodeLong;
+import static
org.apache.pinot.core.common.datatable.DataTableUtils.decodeString;
+
+
+/**
+ * Datatable V3 implementation.
+ * The layout of serialized V3 datatable looks like:
+ * +-----------------------------------------------+
+ * | 13 integers of header: |
+ * | VERSION |
+ * | NUM_ROWS |
+ * | NUM_COLUMNS |
+ * | EXCEPTIONS SECTION START OFFSET |
+ * | EXCEPTIONS SECTION LENGTH |
+ * | DICTIONARY_MAP SECTION START OFFSET |
+ * | DICTIONARY_MAP SECTION LENGTH |
+ * | DATA_SCHEMA SECTION START OFFSET |
+ * | DATA_SCHEMA SECTION LENGTH |
+ * | FIXED_SIZE_DATA SECTION START OFFSET |
+ * | FIXED_SIZE_DATA SECTION LENGTH |
+ * | VARIABLE_SIZE_DATA SECTION START OFFSET |
+ * | VARIABLE_SIZE_DATA SECTION LENGTH |
+ * +-----------------------------------------------+
+ * | EXCEPTIONS SECTION |
+ * +-----------------------------------------------+
+ * | DICTIONARY_MAP SECTION |
+ * +-----------------------------------------------+
+ * | DATA_SCHEMA SECTION |
+ * +-----------------------------------------------+
+ * | FIXED_SIZE_DATA SECTION |
+ * +-----------------------------------------------+
+ * | VARIABLE_SIZE_DATA SECTION |
+ * +-----------------------------------------------+
+ * | METADATA LENGTH |
+ * | METADATA SECTION |
+ * +-----------------------------------------------+
+ */
+public class DataTableImplV3 extends BaseDataTable {
+ private static final int HEADER_SIZE = Integer.BYTES * 13;
+ // _errCodeToExceptionMap stores exceptions as a map of
errorCode->errorMessage
+ private final Map<Integer, String> _errCodeToExceptionMap;
+
+ /**
+ * Construct data table with results. (Server side)
+ */
+ public DataTableImplV3(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
+ byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
+ super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes,
variableSizeDataBytes);
+ _errCodeToExceptionMap = new HashMap<>();
+ }
+
+ /**
+ * Construct empty data table. (Server side)
+ */
+ public DataTableImplV3() {
+ _errCodeToExceptionMap = new HashMap<>();
+ }
+
+ /**
+ * Construct data table from byte array. (broker side)
+ */
+ public DataTableImplV3(ByteBuffer byteBuffer)
+ throws IOException {
+ // Read header.
+ _numRows = byteBuffer.getInt();
+ _numColumns = byteBuffer.getInt();
+ int exceptionsStart = byteBuffer.getInt();
+ int exceptionsLength = byteBuffer.getInt();
+ int dictionaryMapStart = byteBuffer.getInt();
+ int dictionaryMapLength = byteBuffer.getInt();
+ int dataSchemaStart = byteBuffer.getInt();
+ int dataSchemaLength = byteBuffer.getInt();
+ int fixedSizeDataStart = byteBuffer.getInt();
+ int fixedSizeDataLength = byteBuffer.getInt();
+ int variableSizeDataStart = byteBuffer.getInt();
+ int variableSizeDataLength = byteBuffer.getInt();
+
+ // Read exceptions.
+ if (exceptionsLength != 0) {
+ byte[] exceptionsBytes = new byte[exceptionsLength];
+ byteBuffer.position(exceptionsStart);
+ byteBuffer.get(exceptionsBytes);
+ _errCodeToExceptionMap = deserializeExceptions(exceptionsBytes);
+ } else {
+ _errCodeToExceptionMap = new HashMap<>();
+ }
+
+ // Read dictionary.
+ if (dictionaryMapLength != 0) {
+ byte[] dictionaryMapBytes = new byte[dictionaryMapLength];
+ byteBuffer.position(dictionaryMapStart);
+ byteBuffer.get(dictionaryMapBytes);
+ _dictionaryMap = deserializeDictionaryMap(dictionaryMapBytes);
+ } else {
+ _dictionaryMap = null;
+ }
+
+ // Read data schema.
+ if (dataSchemaLength != 0) {
+ byte[] schemaBytes = new byte[dataSchemaLength];
+ byteBuffer.position(dataSchemaStart);
+ byteBuffer.get(schemaBytes);
+ _dataSchema = DataSchema.fromBytes(schemaBytes);
+ _columnOffsets = new int[_dataSchema.size()];
+ _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema,
_columnOffsets);
+ } else {
+ _dataSchema = null;
+ _columnOffsets = null;
+ _rowSizeInBytes = 0;
+ }
+
+ // Read fixed size data.
+ if (fixedSizeDataLength != 0) {
+ _fixedSizeDataBytes = new byte[fixedSizeDataLength];
+ byteBuffer.position(fixedSizeDataStart);
+ byteBuffer.get(_fixedSizeDataBytes);
+ _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes);
+ } else {
+ _fixedSizeDataBytes = null;
+ _fixedSizeData = null;
+ }
+
+ // Read variable size data.
+ if (variableSizeDataLength != 0) {
+ _variableSizeDataBytes = new byte[variableSizeDataLength];
+ byteBuffer.position(variableSizeDataStart);
+ byteBuffer.get(_variableSizeDataBytes);
+ _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
+ } else {
+ _variableSizeDataBytes = null;
+ _variableSizeData = null;
+ }
+
+ // Read metadata.
+ int metadataLength = byteBuffer.getInt();
+ if (metadataLength != 0) {
+ byte[] metadataBytes = new byte[metadataLength];
+ byteBuffer.get(metadataBytes);
+ _metadata = deserializeMetadata(metadataBytes);
+ }
+ }
+
+ @Override
+ public void addException(ProcessingException processingException) {
+ _errCodeToExceptionMap.put(processingException.getErrorCode(),
processingException.getMessage());
+ }
+
+ @Override
+ public Map<Integer, String> getExceptions() {
+ return _errCodeToExceptionMap;
+ }
+
+ @Override
+ public byte[] toBytes()
+ throws IOException {
+ ThreadTimer threadTimer = new ThreadTimer();
+ threadTimer.start();
+
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ dataOutputStream.writeInt(VERSION_3);
+ dataOutputStream.writeInt(_numRows);
+ dataOutputStream.writeInt(_numColumns);
+ int dataOffset = HEADER_SIZE;
+
+ // Write exceptions section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ byte[] exceptionsBytes;
+ exceptionsBytes = serializeExceptions();
+ dataOutputStream.writeInt(exceptionsBytes.length);
+ dataOffset += exceptionsBytes.length;
+
+ // Write dictionary map section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ byte[] dictionaryMapBytes = null;
+ if (_dictionaryMap != null) {
+ dictionaryMapBytes = serializeDictionaryMap();
+ dataOutputStream.writeInt(dictionaryMapBytes.length);
+ dataOffset += dictionaryMapBytes.length;
+ } else {
+ dataOutputStream.writeInt(0);
+ }
+
+ // Write data schema section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ byte[] dataSchemaBytes = null;
+ if (_dataSchema != null) {
+ dataSchemaBytes = _dataSchema.toBytes();
+ dataOutputStream.writeInt(dataSchemaBytes.length);
+ dataOffset += dataSchemaBytes.length;
+ } else {
+ dataOutputStream.writeInt(0);
+ }
+
+ // Write fixed size data section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ if (_fixedSizeDataBytes != null) {
+ dataOutputStream.writeInt(_fixedSizeDataBytes.length);
+ dataOffset += _fixedSizeDataBytes.length;
+ } else {
+ dataOutputStream.writeInt(0);
+ }
+
+ // Write variable size data section offset(START|SIZE).
+ dataOutputStream.writeInt(dataOffset);
+ if (_variableSizeDataBytes != null) {
+ dataOutputStream.writeInt(_variableSizeDataBytes.length);
+ } else {
+ dataOutputStream.writeInt(0);
+ }
+
+ // Write actual data.
+ // Write exceptions bytes.
+ dataOutputStream.write(exceptionsBytes);
+ // Write dictionary map bytes.
+ if (dictionaryMapBytes != null) {
+ dataOutputStream.write(dictionaryMapBytes);
+ }
+ // Write data schema bytes.
+ if (dataSchemaBytes != null) {
+ dataOutputStream.write(dataSchemaBytes);
+ }
+ // Write fixed size data bytes.
+ if (_fixedSizeDataBytes != null) {
+ dataOutputStream.write(_fixedSizeDataBytes);
+ }
+ // Write variable size data bytes.
+ if (_variableSizeDataBytes != null) {
+ dataOutputStream.write(_variableSizeDataBytes);
+ }
+
+ // Update the value of "threadCpuTimeNs" to account data table
serialization time.
+ long responseSerializationCpuTimeNs = threadTimer.stopAndGetThreadTimeNs();
+ // TODO: currently log/emit a total thread cpu time for query execution
time and data table serialization time.
+ // Figure out a way to log/emit separately. Probably via providing an API
on the DataTable to get/set query
+ // context, which is supposed to be used at server side only.
+ long threadCpuTimeNs =
+
Long.parseLong(getMetadata().getOrDefault(THREAD_CPU_TIME_NS.getName(), "0")) +
responseSerializationCpuTimeNs;
+ getMetadata().put(THREAD_CPU_TIME_NS.getName(),
String.valueOf(threadCpuTimeNs));
+
+ // Write metadata: length followed by actual metadata bytes.
+ byte[] metadataBytes = serializeMetadata();
+ dataOutputStream.writeInt(metadataBytes.length);
+ dataOutputStream.write(metadataBytes);
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ /**
+ * Serialize metadata section to bytes.
+ * Format of the bytes looks like:
+ * [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3]
+ * For each KV pair:
+ * - if the value type is String, encode it as: [enumKeyOrdinal,
valueLength, Utf8EncodedValue].
+ * - if the value type is int, encode it as: [enumKeyOrdinal,
bigEndianRepresentationOfIntValue]
+ * - if the value type is long, encode it as: [enumKeyOrdinal,
bigEndianRepresentationOfLongValue]
+ *
+ * Unlike V2, where numeric metadata values (int and long) in V3 are encoded
in UTF-8 in the wire format,
+ * in V3 big endian representation is used.
+ */
+ private byte[] serializeMetadata()
+ throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ dataOutputStream.writeInt(_metadata.size());
+
+ for (Map.Entry<String, String> entry : _metadata.entrySet()) {
+ MetadataKey key = MetadataKey.getByName(entry.getKey());
+ // Ignore unknown keys.
+ if (key == null) {
+ continue;
+ }
+ String value = entry.getValue();
+ dataOutputStream.writeInt(key.ordinal());
+ if (key.getValueType() == MetadataValueType.INT) {
+ dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value)));
+ } else if (key.getValueType() == MetadataValueType.LONG) {
+ dataOutputStream.write(Longs.toByteArray(Long.parseLong(value)));
+ } else {
+ byte[] valueBytes = StringUtil.encodeUtf8(value);
+ dataOutputStream.writeInt(valueBytes.length);
+ dataOutputStream.write(valueBytes);
+ }
+ }
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ /**
+ * Even though the wire format of V3 uses UTF-8 for string/bytes and
big-endian for numeric values,
+ * the in-memory representation is STRING based for processing the metadata
before serialization
+ * (by the server as it adds the statistics in metadata) and after
deserialization (by the broker as it receives
+ * DataTable from each server and aggregates the values).
+ * This is to make V3 implementation keep the consumers of Map<String,
String> getMetadata() API in the code happy
+ * by internally converting it.
+ */
+ private Map<String, String> deserializeMetadata(byte[] bytes)
+ throws IOException {
+ try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
+ DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
+ int numEntries = dataInputStream.readInt();
+ Map<String, String> metadata = new HashMap<>();
+ for (int i = 0; i < numEntries; i++) {
+ int keyId = dataInputStream.readInt();
+ MetadataKey key = MetadataKey.getByOrdinal(keyId);
+ // Ignore unknown keys.
+ if (key == null) {
+ continue;
+ }
+ if (key.getValueType() == MetadataValueType.INT) {
+ String value = String.valueOf(decodeInt(dataInputStream));
+ metadata.put(key.getName(), value);
+ } else if (key.getValueType() == MetadataValueType.LONG) {
+ String value = String.valueOf(decodeLong(dataInputStream));
+ metadata.put(key.getName(), value);
+ } else {
+ String value = String.valueOf(decodeString(dataInputStream));
+ metadata.put(key.getName(), value);
+ }
+ }
+ return metadata;
+ }
+ }
+
+ private byte[] serializeExceptions()
+ throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ dataOutputStream.writeInt(_errCodeToExceptionMap.size());
+
+ for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet())
{
+ int key = entry.getKey();
+ String value = entry.getValue();
+ byte[] valueBytes = StringUtil.encodeUtf8(value);
+ dataOutputStream.writeInt(key);
+ dataOutputStream.writeInt(valueBytes.length);
+ dataOutputStream.write(valueBytes);
+ }
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ private Map<Integer, String> deserializeExceptions(byte[] bytes)
+ throws IOException {
+ try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(bytes);
+ DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
+ int numExceptions = dataInputStream.readInt();
+ Map<Integer, String> exceptions = new HashMap<>(numExceptions);
+ for (int i = 0; i < numExceptions; i++) {
+ int errCode = dataInputStream.readInt();
+ String errMessage = decodeString(dataInputStream);
+ exceptions.put(errCode, errMessage);
+ }
+ return exceptions;
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
index 859a65c..f59dc0c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
@@ -18,13 +18,18 @@
*/
package org.apache.pinot.core.common.datatable;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctTable;
@@ -233,4 +238,44 @@ public class DataTableUtils {
dataTableBuilder.finishRow();
return dataTableBuilder.build();
}
+
+ /**
+ * Helper method to decode string.
+ */
+ public static String decodeString(DataInputStream dataInputStream)
+ throws IOException {
+ int length = dataInputStream.readInt();
+ if (length == 0) {
+ return StringUtils.EMPTY;
+ } else {
+ byte[] buffer = new byte[length];
+ int numBytesRead = dataInputStream.read(buffer);
+ assert numBytesRead == length;
+ return StringUtil.decodeUtf8(buffer);
+ }
+ }
+
+ /**
+ * Helper method to decode int.
+ */
+ public static int decodeInt(DataInputStream dataInputStream)
+ throws IOException {
+ int length = Integer.BYTES;
+ byte[] buffer = new byte[length];
+ int numBytesRead = dataInputStream.read(buffer);
+ assert numBytesRead == length;
+ return Ints.fromByteArray(buffer);
+ }
+
+ /**
+ * Helper method to decode long.
+ */
+ public static long decodeLong(DataInputStream dataInputStream)
+ throws IOException {
+ int length = Long.BYTES;
+ byte[] buffer = new byte[length];
+ int numBytesRead = dataInputStream.read(buffer);
+ assert numBytesRead == length;
+ return Longs.fromByteArray(buffer);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index a73088e..e006bed 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -24,6 +24,8 @@ import
org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.request.context.ThreadTimer;
+import static
org.apache.pinot.common.utils.DataTable.MetadataKey.THREAD_CPU_TIME_NS;
+
public class InstanceResponseOperator extends
BaseOperator<InstanceResponseBlock> {
private static final String OPERATOR_NAME = "InstanceResponseOperator";
@@ -45,7 +47,7 @@ public class InstanceResponseOperator extends
BaseOperator<InstanceResponseBlock
mainThreadTimer.stop();
long totalThreadCpuTimeNs =
intermediateResultsBlock.getExecutionThreadCpuTimeNs() +
mainThreadTimer.getThreadTimeNs();
-
dataTable.getMetadata().put(DataTable.EXECUTION_THREAD_CPU_TIME_NS_METADATA_KEY,
String.valueOf(totalThreadCpuTimeNs));
+ dataTable.getMetadata().put(THREAD_CPU_TIME_NS.getName(),
String.valueOf(totalThreadCpuTimeNs));
return instanceResponseBlock;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 5fd8e92..3886e27 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -37,7 +37,6 @@ import org.apache.pinot.core.common.BlockDocIdValueSet;
import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -193,34 +192,6 @@ public class IntermediateResultsBlock implements Block {
_processingExceptions.add(processingException);
}
- public void setNumDocsScanned(long numDocsScanned) {
- _numDocsScanned = numDocsScanned;
- }
-
- public void setNumEntriesScannedInFilter(long numEntriesScannedInFilter) {
- _numEntriesScannedInFilter = numEntriesScannedInFilter;
- }
-
- public void setNumEntriesScannedPostFilter(long numEntriesScannedPostFilter)
{
- _numEntriesScannedPostFilter = numEntriesScannedPostFilter;
- }
-
- public void setNumSegmentsProcessed(int numSegmentsProcessed) {
- _numSegmentsProcessed = numSegmentsProcessed;
- }
-
- public void setNumSegmentsMatched(int numSegmentsMatched) {
- _numSegmentsMatched = numSegmentsMatched;
- }
-
- public void setNumTotalDocs(long numTotalDocs) {
- _numTotalDocs = numTotalDocs;
- }
-
- public void setNumGroupsLimitReached(boolean numGroupsLimitReached) {
- _numGroupsLimitReached = numGroupsLimitReached;
- }
-
public void setNumResizes(int numResizes) {
_numResizes = numResizes;
}
@@ -229,49 +200,77 @@ public class IntermediateResultsBlock implements Block {
_resizeTimeMs = resizeTimeMs;
}
- public void setExecutionThreadCpuTimeNs(long executionThreadCpuTimeNs) {
- _executionThreadCpuTimeNs = executionThreadCpuTimeNs;
- }
-
public long getExecutionThreadCpuTimeNs() {
return _executionThreadCpuTimeNs;
}
+ public void setExecutionThreadCpuTimeNs(long executionThreadCpuTimeNs) {
+ _executionThreadCpuTimeNs = executionThreadCpuTimeNs;
+ }
+
@VisibleForTesting
public long getNumDocsScanned() {
return _numDocsScanned;
}
+ public void setNumDocsScanned(long numDocsScanned) {
+ _numDocsScanned = numDocsScanned;
+ }
+
@VisibleForTesting
public long getNumEntriesScannedInFilter() {
return _numEntriesScannedInFilter;
}
+ public void setNumEntriesScannedInFilter(long numEntriesScannedInFilter) {
+ _numEntriesScannedInFilter = numEntriesScannedInFilter;
+ }
+
@VisibleForTesting
public long getNumEntriesScannedPostFilter() {
return _numEntriesScannedPostFilter;
}
+ public void setNumEntriesScannedPostFilter(long numEntriesScannedPostFilter)
{
+ _numEntriesScannedPostFilter = numEntriesScannedPostFilter;
+ }
+
@VisibleForTesting
public int getNumSegmentsProcessed() {
return _numSegmentsProcessed;
}
+ public void setNumSegmentsProcessed(int numSegmentsProcessed) {
+ _numSegmentsProcessed = numSegmentsProcessed;
+ }
+
@VisibleForTesting
public int getNumSegmentsMatched() {
return _numSegmentsMatched;
}
+ public void setNumSegmentsMatched(int numSegmentsMatched) {
+ _numSegmentsMatched = numSegmentsMatched;
+ }
+
@VisibleForTesting
public long getNumTotalDocs() {
return _numTotalDocs;
}
+ public void setNumTotalDocs(long numTotalDocs) {
+ _numTotalDocs = numTotalDocs;
+ }
+
@VisibleForTesting
public boolean isNumGroupsLimitReached() {
return _numGroupsLimitReached;
}
+ public void setNumGroupsLimitReached(boolean numGroupsLimitReached) {
+ _numGroupsLimitReached = numGroupsLimitReached;
+ }
+
public DataTable getDataTable()
throws Exception {
@@ -423,7 +422,7 @@ public class IntermediateResultsBlock implements Block {
}
private DataTable getMetadataDataTable() {
- return attachMetadataToDataTable(new DataTableImplV2());
+ return attachMetadataToDataTable(DataTableBuilder.getEmptyDataTable());
}
private DataTable attachMetadataToDataTable(DataTable dataTable) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 194e57f..ea9c3de 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -37,7 +37,7 @@ import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
@@ -138,7 +138,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
String errorMessage = String
.format("Query scheduling took %dms (longer than query timeout of
%dms)", querySchedulingTimeMs,
queryTimeoutMs);
- DataTable dataTable = new DataTableImplV2();
+ DataTable dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
errorMessage));
LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
return dataTable;
@@ -147,7 +147,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
String errorMessage = "Failed to find table: " + tableNameWithType;
- DataTable dataTable = new DataTableImplV2();
+ DataTable dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
errorMessage));
LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
return dataTable;
@@ -224,7 +224,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
LOGGER.error("Exception processing requestId {}", requestId, e);
}
- dataTable = new DataTableImplV2();
+ dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
} finally {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
@@ -274,7 +274,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
if (numSelectedSegments == 0) {
// Only return metadata for streaming query
- DataTable dataTable = enableStreaming ? new DataTableImplV2() :
DataTableUtils.buildEmptyDataTable(queryContext);
+ DataTable dataTable =
+ enableStreaming ? DataTableBuilder.getEmptyDataTable() :
DataTableUtils.buildEmptyDataTable(queryContext);
Map<String, String> metadata = dataTable.getMetadata();
metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(numTotalDocs));
metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 7d61015..1a3299b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -123,10 +123,9 @@ public class BrokerReduceService {
}
// Reduce on exceptions.
- for (String key : metadata.keySet()) {
- if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) {
- processingExceptions.add(new
QueryProcessingException(Integer.parseInt(key.substring(9)),
metadata.get(key)));
- }
+ Map<Integer, String> exceptions = dataTable.getExceptions();
+ for (int key : exceptions.keySet()) {
+ processingExceptions.add(new QueryProcessingException(key,
exceptions.get(key)));
}
// Reduce on execution statistics.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index bda3fc9..610fe15 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -18,15 +18,18 @@
*/
package org.apache.pinot.core.query.scheduler;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.RateLimiter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
-
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
@@ -35,7 +38,7 @@ import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.TimerContext;
@@ -44,11 +47,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import com.google.common.util.concurrent.RateLimiter;
+import static
org.apache.pinot.common.utils.DataTable.MetadataKey.THREAD_CPU_TIME_NS;
/**
@@ -65,15 +64,13 @@ public abstract class QueryScheduler {
private static final String INVALID_RESIZE_TIME_MS = "-1";
private static final String QUERY_LOG_MAX_RATE_KEY =
"query.log.maxRatePerSecond";
private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
-
- private final RateLimiter queryLogRateLimiter;
- private final RateLimiter numDroppedLogRateLimiter;
- private final AtomicInteger numDroppedLogCounter;
-
protected final ServerMetrics serverMetrics;
protected final QueryExecutor queryExecutor;
protected final ResourceManager resourceManager;
protected final LongAccumulator latestQueryTime;
+ private final RateLimiter queryLogRateLimiter;
+ private final RateLimiter numDroppedLogRateLimiter;
+ private final AtomicInteger numDroppedLogCounter;
protected volatile boolean isRunning = false;
/**
@@ -94,7 +91,8 @@ public abstract class QueryScheduler {
this.resourceManager = resourceManager;
this.queryExecutor = queryExecutor;
this.latestQueryTime = latestQueryTime;
- this.queryLogRateLimiter =
RateLimiter.create(config.getProperty(QUERY_LOG_MAX_RATE_KEY,
DEFAULT_QUERY_LOG_MAX_RATE));
+ this.queryLogRateLimiter =
+ RateLimiter.create(config.getProperty(QUERY_LOG_MAX_RATE_KEY,
DEFAULT_QUERY_LOG_MAX_RATE));
this.numDroppedLogRateLimiter = RateLimiter.create(1.0d);
this.numDroppedLogCounter = new AtomicInteger(0);
@@ -161,32 +159,36 @@ public abstract class QueryScheduler {
queryRequest.getBrokerId(), e);
// For not handled exceptions
serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
- dataTable = new DataTableImplV2();
+ dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.INTERNAL_ERROR,
e));
}
long requestId = queryRequest.getRequestId();
Map<String, String> dataTableMetadata = dataTable.getMetadata();
dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
+ byte[] responseBytes = serializeDataTable(queryRequest, dataTable);
+
// Log the statistics
String tableNameWithType = queryRequest.getTableNameWithType();
long numDocsScanned =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_DOCS_SCANNED_METADATA_KEY,
INVALID_NUM_SCANNED));
long numEntriesScannedInFilter = Long.parseLong(
dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY,
INVALID_NUM_SCANNED));
- long numEntriesScannedPostFilter =
-
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
INVALID_NUM_SCANNED));
+ long numEntriesScannedPostFilter = Long.parseLong(
+
dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
INVALID_NUM_SCANNED));
long numSegmentsProcessed =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED,
INVALID_SEGMENTS_COUNT));
long numSegmentsMatched =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED,
INVALID_SEGMENTS_COUNT));
- long numSegmentsConsuming =
-
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED,
INVALID_SEGMENTS_COUNT));
+ long numSegmentsConsuming = Long.parseLong(
+
dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED,
INVALID_SEGMENTS_COUNT));
long minConsumingFreshnessMs =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
INVALID_FRESHNESS_MS));
- int numResizes =
Integer.parseInt(dataTableMetadata.getOrDefault(DataTable.NUM_RESIZES_METADATA_KEY,
INVALID_NUM_RESIZES));
- long resizeTimeMs =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.RESIZE_TIME_MS_METADATA_KEY,
INVALID_RESIZE_TIME_MS));
- long executionThreadCpuTimeNs =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.EXECUTION_THREAD_CPU_TIME_NS_METADATA_KEY,
"0"));
+ int numResizes =
+
Integer.parseInt(dataTableMetadata.getOrDefault(DataTable.NUM_RESIZES_METADATA_KEY,
INVALID_NUM_RESIZES));
+ long resizeTimeMs =
+
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.RESIZE_TIME_MS_METADATA_KEY,
INVALID_RESIZE_TIME_MS));
+ long threadCpuTimeNs =
Long.parseLong(dataTableMetadata.getOrDefault(THREAD_CPU_TIME_NS.getName(),
"0"));
if (numDocsScanned > 0) {
serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -205,8 +207,8 @@ public abstract class QueryScheduler {
if (resizeTimeMs > 0) {
serverMetrics.addValueToTableGauge(tableNameWithType,
ServerGauge.RESIZE_TIME_MS, resizeTimeMs);
}
- if (executionThreadCpuTimeNs > 0) {
- serverMetrics.addValueToTableGauge(tableNameWithType,
ServerGauge.EXECUTION_THREAD_CPU_TIME_NS, executionThreadCpuTimeNs);
+ if (threadCpuTimeNs > 0) {
+ serverMetrics.addValueToTableGauge(tableNameWithType,
ServerGauge.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs);
}
TimerContext timerContext = queryRequest.getTimerContext();
@@ -218,14 +220,14 @@ public abstract class QueryScheduler {
if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs,
numDocsScanned)) {
LOGGER.info("Processed
requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},"
+
"schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={},"
- +
"numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},executionThreadCpuTimeNs={}",
requestId, tableNameWithType,
- numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched,
numSegmentsConsuming, schedulerWaitMs,
-
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
+ +
"numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},threadCpuTimeNs={}",
requestId,
+ tableNameWithType, numSegmentsQueried, numSegmentsProcessed,
numSegmentsMatched, numSegmentsConsuming,
+ schedulerWaitMs,
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME),
minConsumingFreshnessMs,
queryRequest.getBrokerId(), numDocsScanned,
numEntriesScannedInFilter, numEntriesScannedPostFilter, name(),
- executionThreadCpuTimeNs);
+ threadCpuTimeNs);
// Limit the dropping log message at most once per second.
if (numDroppedLogRateLimiter.tryAcquire()) {
@@ -250,13 +252,7 @@ public abstract class QueryScheduler {
serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
- /**
- * TODO: Currently not send "executionThreadCpuTimeNs" as part of metadata
to broker. Revisit this when follow-up
- * work of data table serialization cost measurement is done.
- */
-
dataTableMetadata.remove(DataTable.EXECUTION_THREAD_CPU_TIME_NS_METADATA_KEY);
- byte[] responseData = serializeDataTable(queryRequest, dataTable);
- return responseData;
+ return responseBytes;
}
/**
@@ -272,10 +268,7 @@ public abstract class QueryScheduler {
}
// If the number of document scanned is larger than 1 million rows, force
the log
- if (numDocsScanned > 1_000_000L) {
- return true;
- }
- return false;
+ return numDocsScanned > 1_000_000L;
}
/**
@@ -315,7 +308,7 @@ public abstract class QueryScheduler {
*/
protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest
queryRequest,
ProcessingException error) {
- DataTable result = new DataTableImplV2();
+ DataTable result = DataTableBuilder.getEmptyDataTable();
Map<String, String> dataTableMetadata = result.getMetadata();
dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(queryRequest.getRequestId()));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index bdcac02..2ae6567 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -35,7 +35,7 @@ import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.spi.utils.BytesUtils;
@@ -107,7 +107,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
String hexString = requestBytes != null ?
BytesUtils.toHexString(requestBytes) : "";
long reqestId = instanceRequest != null ? instanceRequest.getRequestId()
: 0;
LOGGER.error("Exception while processing instance request: {}",
hexString, e);
- sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new
DataTableImplV2(), e);
+ sendErrorResponse(ctx, reqestId, queryArrivalTimeMs,
DataTableBuilder.getEmptyDataTable(), e);
}
}
@@ -121,7 +121,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
sendResponse(ctx, queryArrivalTimeMs, responseBytes);
} else {
// Send exception response.
- sendErrorResponse(ctx, queryRequest.getRequestId(),
queryArrivalTimeMs, new DataTableImplV2(),
+ sendErrorResponse(ctx, queryRequest.getRequestId(),
queryArrivalTimeMs, DataTableBuilder.getEmptyDataTable(),
new Exception("Null query response."));
}
}
@@ -130,7 +130,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
public void onFailure(Throwable t) {
// Send exception response.
LOGGER.error("Exception while processing instance request", t);
- sendErrorResponse(ctx, instanceRequest.getRequestId(),
queryArrivalTimeMs, new DataTableImplV2(),
+ sendErrorResponse(ctx, instanceRequest.getRequestId(),
queryArrivalTimeMs, DataTableBuilder.getEmptyDataTable(),
new Exception(t));
}
};
@@ -142,7 +142,8 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
// will only be called if for some remote reason we are unable to handle
exceptions in channelRead0.
String message = "Unhandled Exception in " + getClass().getCanonicalName();
LOGGER.error(message, cause);
- sendErrorResponse(ctx, 0, System.currentTimeMillis(), new
DataTableImplV2(), new Exception(message, cause));
+ sendErrorResponse(ctx, 0, System.currentTimeMillis(),
DataTableBuilder.getEmptyDataTable(),
+ new Exception(message, cause));
}
/**
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 78432b8..5b8f516 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -18,8 +18,15 @@
*/
package org.apache.pinot.core.common.datatable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Map;
import java.util.Random;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
@@ -27,9 +34,15 @@ import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.query.request.context.ThreadTimer;
+import org.apache.pinot.spi.utils.ByteArray;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.apache.pinot.common.utils.DataTable.MetadataKey.*;
+import static
org.apache.pinot.core.common.datatable.DataTableBuilder.VERSION_3;
+
/**
* Unit test for {@link DataTable} serialization/de-serialization.
@@ -41,6 +54,29 @@ public class DataTableSerDeTest {
private static final int NUM_ROWS = 100;
+ private static final Map<String, String> EXPECTED_METADATA =
+ ImmutableMap.<String, String>builder().put(NUM_DOCS_SCANNED.getName(),
String.valueOf(20L))
+ .put(NUM_ENTRIES_SCANNED_IN_FILTER.getName(), String.valueOf(5L))
+ .put(NUM_ENTRIES_SCANNED_POST_FILTER.getName(), String.valueOf(7L))
+ .put(NUM_SEGMENTS_QUERIED.getName(), String.valueOf(6))
+ .put(NUM_SEGMENTS_PROCESSED.getName(), String.valueOf(6))
+ .put(NUM_SEGMENTS_MATCHED.getName(), String.valueOf(1))
+ .put(NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), String.valueOf(1))
+ .put(MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), String.valueOf(100L))
+ .put(TOTAL_DOCS.getName(),
String.valueOf(200L)).put(NUM_GROUPS_LIMIT_REACHED.getName(), "true")
+ .put(TIME_USED_MS.getName(),
String.valueOf(20000L)).put(TRACE_INFO.getName(),
+ "StudentException: Error finding students\n"
+ + " at
StudentManager.findStudents(StudentManager.java:13)\n"
+ + " at StudentProgram.main(StudentProgram.java:9)\n"
+ + "Caused by: DAOException: Error querying students from
database\n"
+ + " at StudentDAO.list(StudentDAO.java:11)\n"
+ + " at
StudentManager.findStudents(StudentManager.java:11)\n" + " ... 1 more\n"
+ + "Caused by: java.sql.SQLException: Syntax Error\n"
+ + " at DatabaseUtils.executeQuery(DatabaseUtils.java:5)\n"
+ + " at StudentDAO.list(StudentDAO.java:8)\n" + "
... 2 more")
+ .put(REQUEST_ID.getName(),
String.valueOf(90181881818L)).put(NUM_RESIZES.getName(), String.valueOf(900L))
+ .put(RESIZE_TIME_MS.getName(), String.valueOf(1919199L)).build();
+
@Test
public void testException()
throws IOException {
@@ -49,14 +85,13 @@ public class DataTableSerDeTest {
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
exception);
String expected = processingException.getMessage();
- DataTable dataTable = new DataTableImplV2();
+ DataTable dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.addException(processingException);
DataTable newDataTable =
DataTableFactory.getDataTable(dataTable.toBytes());
Assert.assertNull(newDataTable.getDataSchema());
Assert.assertEquals(newDataTable.getNumberOfRows(), 0);
- String actual = newDataTable.getMetadata()
- .get(DataTable.EXCEPTION_METADATA_KEY +
QueryException.QUERY_EXECUTION_ERROR.getErrorCode());
+ String actual =
newDataTable.getExceptions().get(QueryException.QUERY_EXECUTION_ERROR.getErrorCode());
Assert.assertEquals(actual, expected);
}
@@ -96,15 +131,192 @@ public class DataTableSerDeTest {
for (int i = 0; i < numColumns; i++) {
columnNames[i] = columnDataTypes[i].name();
}
+
+ int[] ints = new int[NUM_ROWS];
+ long[] longs = new long[NUM_ROWS];
+ float[] floats = new float[NUM_ROWS];
+ double[] doubles = new double[NUM_ROWS];
+ String[] strings = new String[NUM_ROWS];
+ byte[][] bytes = new byte[NUM_ROWS][];
+ Object[] objects = new Object[NUM_ROWS];
+ int[][] intArrays = new int[NUM_ROWS][];
+ long[][] longArrays = new long[NUM_ROWS][];
+ float[][] floatArrays = new float[NUM_ROWS][];
+ double[][] doubleArrays = new double[NUM_ROWS][];
+ String[][] stringArrays = new String[NUM_ROWS][];
+
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+ DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns,
ints, longs, floats, doubles, strings,
+ bytes, objects, intArrays, longArrays, floatArrays, doubleArrays,
stringArrays);
+
+ DataTable dataTable = dataTableBuilder.build();
+ DataTable newDataTable =
DataTableFactory.getDataTable(dataTable.toBytes());
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs,
floats, doubles, strings, bytes, objects,
+ intArrays, longArrays, floatArrays, doubleArrays, stringArrays);
+ }
+
+ @Test
+ public void testV2V3Compatibility()
+ throws IOException {
+ DataSchema.ColumnDataType[] columnDataTypes =
DataSchema.ColumnDataType.values();
+ int numColumns = columnDataTypes.length;
+ String[] columnNames = new String[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ columnNames[i] = columnDataTypes[i].name();
+ }
+
+ int[] ints = new int[NUM_ROWS];
+ long[] longs = new long[NUM_ROWS];
+ float[] floats = new float[NUM_ROWS];
+ double[] doubles = new double[NUM_ROWS];
+ String[] strings = new String[NUM_ROWS];
+ byte[][] bytes = new byte[NUM_ROWS][];
+ Object[] objects = new Object[NUM_ROWS];
+ int[][] intArrays = new int[NUM_ROWS][];
+ long[][] longArrays = new long[NUM_ROWS][];
+ float[][] floatArrays = new float[NUM_ROWS][];
+ double[][] doubleArrays = new double[NUM_ROWS][];
+ String[][] stringArrays = new String[NUM_ROWS][];
+
+ DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+ // Verify V3 broker can deserialize data table (has data, but has no
metadata) send by V2 server
+ DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_2);
+ DataTableBuilder dataTableBuilderV2WithDataOnly = new
DataTableBuilder(dataSchema);
+ fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly,
columnDataTypes, numColumns, ints, longs, floats,
+ doubles, strings, bytes, objects, intArrays, longArrays, floatArrays,
doubleArrays, stringArrays);
+
+ DataTable dataTableV2 = dataTableBuilderV2WithDataOnly.build(); // create
a V2 data table
+ DataTable newDataTable =
+ DataTableFactory.getDataTable(dataTableV2.toBytes()); // Broker
deserialize data table bytes as V2
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs,
floats, doubles, strings, bytes, objects,
+ intArrays, longArrays, floatArrays, doubleArrays, stringArrays);
+ Assert.assertEquals(newDataTable.getMetadata().size(), 0);
+
+ // Verify V3 broker can deserialize data table (has data and metadata)
send by V2 server
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); //
Broker deserialize data table bytes as V2
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs,
floats, doubles, strings, bytes, objects,
+ intArrays, longArrays, floatArrays, doubleArrays, stringArrays);
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V3 broker can deserialize data table (only has metadata) send by
V2 server
+ DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = new
DataTableBuilder(dataSchema);
+ dataTableV2 = dataTableBuilderV2WithMetadataDataOnly.build(); // create a
V2 data table
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV2.toBytes()); //
Broker deserialize data table bytes as V2
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V3 broker can deserialize (has data, but has no metadata) send
by V3 server.
+ DataTableBuilder.setCurrentDataTableVersion(VERSION_3);
+ DataTableBuilder dataTableBuilderV3WithDataOnly = new
DataTableBuilder(dataSchema);
+ fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly,
columnDataTypes, numColumns, ints, longs, floats,
+ doubles, strings, bytes, objects, intArrays, longArrays, floatArrays,
doubleArrays, stringArrays);
+ DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create
a V3 data table
+ // Deserialize data table bytes as V3
+ newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs,
floats, doubles, strings, bytes, objects,
+ intArrays, longArrays, floatArrays, doubleArrays, stringArrays);
+ // DataTable V3 serialization logic will add an extra THREAD_CPU_TIME_NS
KV pair into metadata
+ Assert.assertEquals(newDataTable.getMetadata().size(), 1);
+
Assert.assertTrue(newDataTable.getMetadata().containsKey(THREAD_CPU_TIME_NS.getName()));
+
+ // Verify V3 broker can deserialize data table (has data and metadata)
send by V3 server
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); //
Broker deserialize data table bytes as V3
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns, ints, longs,
floats, doubles, strings, bytes, objects,
+ intArrays, longArrays, floatArrays, doubleArrays, stringArrays);
+ newDataTable.getMetadata().remove(THREAD_CPU_TIME_NS.getName());
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+ // Verify V3 broker can deserialize data table (only has metadata) send by
V3 server
+ DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new
DataTableBuilder(dataSchema);
+ dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a
V2 data table
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); //
Broker deserialize data table bytes as V2
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+ newDataTable.getMetadata().remove(THREAD_CPU_TIME_NS.getName());
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+ }
+
+ @Test
+ public void testExecutionThreadCpuTimeNs()
+ throws IOException {
+ DataSchema.ColumnDataType[] columnDataTypes =
DataSchema.ColumnDataType.values();
+ int numColumns = columnDataTypes.length;
+ String[] columnNames = new String[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ columnNames[i] = columnDataTypes[i].name();
+ }
+
+ int[] ints = new int[NUM_ROWS];
+ long[] longs = new long[NUM_ROWS];
+ float[] floats = new float[NUM_ROWS];
+ double[] doubles = new double[NUM_ROWS];
+ String[] strings = new String[NUM_ROWS];
+ byte[][] bytes = new byte[NUM_ROWS][];
+ Object[] objects = new Object[NUM_ROWS];
+ int[][] intArrays = new int[NUM_ROWS][];
+ long[][] longArrays = new long[NUM_ROWS][];
+ float[][] floatArrays = new float[NUM_ROWS][];
+ double[][] doubleArrays = new double[NUM_ROWS][];
+ String[][] stringArrays = new String[NUM_ROWS][];
+
+ DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns,
ints, longs, floats, doubles, strings,
+ bytes, objects, intArrays, longArrays, floatArrays, doubleArrays,
stringArrays);
+
+ DataTable dataTable = dataTableBuilder.build();
+ DataTable newDataTable =
DataTableFactory.getDataTable(dataTable.toBytes());
+ // When ThreadCpuTimeMeasurement is disabled, value of threadCpuTimeNs is
0.
+
Assert.assertEquals(newDataTable.getMetadata().get(THREAD_CPU_TIME_NS.getName()),
String.valueOf(0));
+
+ // Enable ThreadCpuTimeMeasurement, serialize/de-serialize data table
again.
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
+ newDataTable = DataTableFactory.getDataTable(dataTable.toBytes());
+ // When ThreadCpuTimeMeasurement is enabled, value of threadCpuTimeNs is
not 0.
+
Assert.assertNotEquals(newDataTable.getMetadata().get(THREAD_CPU_TIME_NS.getName()),
String.valueOf(0));
+ }
+
+ @Test
+ public void testDataTableMetadataBytesLayout()
+ throws IOException {
+ DataSchema.ColumnDataType[] columnDataTypes =
DataSchema.ColumnDataType.values();
+ int numColumns = columnDataTypes.length;
+ String[] columnNames = new String[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ columnNames[i] = columnDataTypes[i].name();
+ }
int[] ints = new int[NUM_ROWS];
long[] longs = new long[NUM_ROWS];
float[] floats = new float[NUM_ROWS];
double[] doubles = new double[NUM_ROWS];
String[] strings = new String[NUM_ROWS];
+ byte[][] bytes = new byte[NUM_ROWS][];
Object[] objects = new Object[NUM_ROWS];
int[][] intArrays = new int[NUM_ROWS][];
long[][] longArrays = new long[NUM_ROWS][];
@@ -112,6 +324,74 @@ public class DataTableSerDeTest {
double[][] doubleArrays = new double[NUM_ROWS][];
String[][] stringArrays = new String[NUM_ROWS][];
+ DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+ DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns,
ints, longs, floats, doubles, strings,
+ bytes, objects, intArrays, longArrays, floatArrays, doubleArrays,
stringArrays);
+
+ DataTable dataTable = dataTableBuilder.build();
+
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTable.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(dataTable.toBytes());
+ int version = byteBuffer.getInt();
+ Assert.assertEquals(version, VERSION_3);
+ byteBuffer.getInt(); // numOfRows
+ byteBuffer.getInt(); // numOfColumns
+ byteBuffer.getInt(); // exceptionsStart
+ byteBuffer.getInt(); // exceptionsLength
+ byteBuffer.getInt(); // dictionaryMapStart
+ byteBuffer.getInt(); // dictionaryMapLength
+ byteBuffer.getInt(); // dataSchemaStart
+ byteBuffer.getInt(); // dataSchemaLength
+ byteBuffer.getInt(); // fixedSizeDataStart
+ byteBuffer.getInt(); // fixedSizeDataLength
+ int variableSizeDataStart = byteBuffer.getInt();
+ int variableSizeDataLength = byteBuffer.getInt();
+
+ int metadataStart = variableSizeDataStart + variableSizeDataLength;
+ byteBuffer.position(metadataStart);
+ int metadataLength = byteBuffer.getInt();
+ byte[] metadataBytes = new byte[metadataLength];
+ byteBuffer.get(metadataBytes);
+
+ try (ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(metadataBytes);
+ DataInputStream dataInputStream = new
DataInputStream(byteArrayInputStream)) {
+ int numEntries = dataInputStream.readInt();
+ // DataTable V3 serialization logic will add an extra THREAD_CPU_TIME_NS
KV pair into metadata
+ Assert.assertEquals(numEntries, EXPECTED_METADATA.size() + 1);
+ for (int i = 0; i < numEntries; i++) {
+ int keyOrdinal = dataInputStream.readInt();
+ DataTable.MetadataKey key = getByOrdinal(keyOrdinal);
+ Assert.assertNotEquals(key, null);
+ if (key.getValueType() == DataTable.MetadataValueType.INT) {
+ byte[] actualBytes = new byte[Integer.BYTES];
+ dataInputStream.read(actualBytes);
+ Assert.assertEquals(actualBytes,
Ints.toByteArray(Integer.parseInt(EXPECTED_METADATA.get(key.getName()))));
+ } else if (key.getValueType() == DataTable.MetadataValueType.LONG) {
+ byte[] actualBytes = new byte[Long.BYTES];
+ dataInputStream.read(actualBytes);
+ // Ignore the THREAD_CPU_TIME_NS key since it's added during data
serialization.
+ if (key != THREAD_CPU_TIME_NS) {
+ Assert.assertEquals(actualBytes,
Longs.toByteArray(Long.parseLong(EXPECTED_METADATA.get(key.getName()))));
+ }
+ } else {
+ int valueLength = dataInputStream.readInt();
+ byte[] actualBytes = new byte[valueLength];
+ dataInputStream.read(actualBytes);
+ Assert.assertEquals(actualBytes,
StringUtil.encodeUtf8((EXPECTED_METADATA.get(key.getName()))));
+ }
+ }
+ }
+ }
+
+ private void fillDataTableWithRandomData(DataTableBuilder dataTableBuilder,
+ DataSchema.ColumnDataType[] columnDataTypes, int numColumns, int[] ints,
long[] longs, float[] floats,
+ double[] doubles, String[] strings, byte[][] bytes, Object[] objects,
int[][] intArrays, long[][] longArrays,
+ float[][] floatArrays, double[][] doubleArrays, String[][] stringArrays)
+ throws IOException {
for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
dataTableBuilder.startRow();
for (int colId = 0; colId < numColumns; colId++) {
@@ -136,6 +416,10 @@ public class DataTableSerDeTest {
strings[rowId] = RandomStringUtils.random(RANDOM.nextInt(20));
dataTableBuilder.setColumn(colId, strings[rowId]);
break;
+ case BYTES:
+ bytes[rowId] =
RandomStringUtils.random(RANDOM.nextInt(20)).getBytes();
+ dataTableBuilder.setColumn(colId, new ByteArray(bytes[rowId]));
+ break;
// Just test Double here, all object types will be covered in
ObjectCustomSerDeTest.
case OBJECT:
objects[rowId] = RANDOM.nextDouble();
@@ -190,12 +474,11 @@ public class DataTableSerDeTest {
}
dataTableBuilder.finishRow();
}
+ }
- DataTable dataTable = dataTableBuilder.build();
- DataTable newDataTable =
DataTableFactory.getDataTable(dataTable.toBytes());
- Assert.assertEquals(newDataTable.getDataSchema(), dataSchema,
ERROR_MESSAGE);
- Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS,
ERROR_MESSAGE);
-
+ private void verifyDataIsSame(DataTable newDataTable,
DataSchema.ColumnDataType[] columnDataTypes, int numColumns,
+ int[] ints, long[] longs, float[] floats, double[] doubles, String[]
strings, byte[][] bytes, Object[] objects,
+ int[][] intArrays, long[][] longArrays, float[][] floatArrays,
double[][] doubleArrays, String[][] stringArrays) {
for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
for (int colId = 0; colId < numColumns; colId++) {
switch (columnDataTypes[colId]) {
@@ -214,6 +497,9 @@ public class DataTableSerDeTest {
case STRING:
Assert.assertEquals(newDataTable.getString(rowId, colId),
strings[rowId], ERROR_MESSAGE);
break;
+ case BYTES:
+ Assert.assertEquals(newDataTable.getBytes(rowId,
colId).getBytes(), bytes[rowId], ERROR_MESSAGE);
+ break;
case OBJECT:
Assert.assertEquals(newDataTable.getObject(rowId, colId),
objects[rowId], ERROR_MESSAGE);
break;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 75987a0..d03a1b9 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -43,8 +43,8 @@ import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
-import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -55,6 +55,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
+import static org.apache.pinot.common.utils.DataTable.MetadataKey.TABLE;
import static
org.apache.pinot.core.query.scheduler.TestHelper.createQueryRequest;
import static
org.apache.pinot.core.query.scheduler.TestHelper.createServerQueryRequest;
import static org.testng.Assert.assertEquals;
@@ -116,9 +117,8 @@ public class PrioritySchedulerTest {
int hasServerShuttingDownError = 0;
for (ListenableFuture<byte[]> result : results) {
DataTable table = DataTableFactory.getDataTable(result.get());
- hasServerShuttingDownError += table.getMetadata()
- .containsKey(DataTable.EXCEPTION_METADATA_KEY +
QueryException.SERVER_SCHEDULER_DOWN_ERROR.getErrorCode()) ? 1
- : 0;
+ hasServerShuttingDownError +=
+
table.getExceptions().containsKey(QueryException.SERVER_SCHEDULER_DOWN_ERROR.getErrorCode())
? 1 : 0;
}
assertTrue(hasServerShuttingDownError > 0);
}
@@ -149,7 +149,7 @@ public class PrioritySchedulerTest {
validationBarrier.await();
byte[] resultData = result.get();
DataTable table = DataTableFactory.getDataTable(resultData);
- assertEquals(table.getMetadata().get("table"), "1");
+ assertEquals(table.getMetadata().get(TABLE.getName()), "1");
// verify that accounting is handled right
assertEquals(group.numPending(), 0);
assertEquals(group.getThreadsInUse(), 0);
@@ -226,9 +226,8 @@ public class PrioritySchedulerTest {
ListenableFuture<byte[]> result =
scheduler.submit(createServerQueryRequest("1", metrics));
// start is not called
DataTable response = DataTableFactory.getDataTable(result.get());
- assertTrue(response.getMetadata()
- .containsKey(DataTable.EXCEPTION_METADATA_KEY +
QueryException.SERVER_SCHEDULER_DOWN_ERROR.getErrorCode()));
- assertFalse(response.getMetadata().containsKey("table"));
+
assertTrue(response.getExceptions().containsKey(QueryException.SERVER_SCHEDULER_DOWN_ERROR.getErrorCode()));
+ assertFalse(response.getMetadata().containsKey(TABLE.getName()));
scheduler.stop();
}
@@ -236,6 +235,13 @@ public class PrioritySchedulerTest {
static TestSchedulerGroupFactory groupFactory;
static LongAccumulator latestQueryTime;
+ // store locally for easy access
+ public TestPriorityScheduler(PinotConfiguration config, ResourceManager
resourceManager,
+ QueryExecutor queryExecutor, SchedulerPriorityQueue queue,
ServerMetrics metrics,
+ LongAccumulator latestQueryTime) {
+ super(config, resourceManager, queryExecutor, queue, metrics,
latestQueryTime);
+ }
+
public static TestPriorityScheduler create(PinotConfiguration config) {
ResourceManager rm = new PolicyBasedResourceManager(config);
QueryExecutor qe = new TestQueryExecutor();
@@ -250,13 +256,6 @@ public class PrioritySchedulerTest {
return create(new PinotConfiguration());
}
- // store locally for easy access
- public TestPriorityScheduler(PinotConfiguration config, ResourceManager
resourceManager,
- QueryExecutor queryExecutor, SchedulerPriorityQueue queue,
ServerMetrics metrics,
- LongAccumulator latestQueryTime) {
- super(config, resourceManager, queryExecutor, queue, metrics,
latestQueryTime);
- }
-
ResourceManager getResourceManager() {
return this.resourceManager;
}
@@ -307,8 +306,8 @@ public class PrioritySchedulerTest {
throw new RuntimeException(e);
}
}
- DataTableImplV2 result = new DataTableImplV2();
- result.getMetadata().put("table", queryRequest.getTableNameWithType());
+ DataTable result = DataTableBuilder.getEmptyDataTable();
+ result.getMetadata().put(TABLE.getName(),
queryRequest.getTableNameWithType());
if (useBarrier) {
try {
validationBarrier.await();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index ccec5ec..c92cc73 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -26,7 +26,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.spi.config.table.TableType;
@@ -79,7 +79,7 @@ public class QueryRoutingTest {
public void testValidResponse()
throws Exception {
long requestId = 123;
- DataTable dataTable = new DataTableImplV2();
+ DataTable dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
@@ -156,7 +156,7 @@ public class QueryRoutingTest {
public void testNonMatchingRequestId()
throws Exception {
long requestId = 123;
- DataTable dataTable = new DataTableImplV2();
+ DataTable dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
@@ -186,7 +186,7 @@ public class QueryRoutingTest {
public void testServerDown()
throws Exception {
long requestId = 123;
- DataTable dataTable = new DataTableImplV2();
+ DataTable dataTable = DataTableBuilder.getEmptyDataTable();
dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index b2c467a..6b1b9e2 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -65,6 +65,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static
org.apache.pinot.common.utils.DataTable.MetadataKey.THREAD_CPU_TIME_NS;
import static org.testng.Assert.*;
@@ -1556,7 +1557,9 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
String responseType =
streamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE);
if
(responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) {
- assertTrue(dataTable.getMetadata().isEmpty());
+ // verify the returned data table metadata only contains
"threadCpuTimeNs".
+ Map<String, String> metadata = dataTable.getMetadata();
+ assertTrue(metadata.size() == 1 &&
metadata.containsKey(THREAD_CPU_TIME_NS.getName()));
assertNotNull(dataTable.getDataSchema());
numTotalDocs += dataTable.getNumberOfRows();
} else {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index 2d9758b..c1805f1 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -59,6 +59,7 @@ import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.ServiceStatus.Status;
import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.request.context.ThreadTimer;
import
org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
@@ -146,6 +147,11 @@ public class HelixServerStarter implements
ServiceStartable {
ThreadTimer.setThreadCpuTimeMeasurementEnabled(_serverConf
.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
+
+ // Set data table version send to broker.
+ DataTableBuilder.setCurrentDataTableVersion(_serverConf
+ .getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION,
+ Server.DEFAULT_CURRENT_DATA_TABLE_VERSION));
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]