This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb9b5f27f use single string dict and eliminate the integer key field
(#8894)
0bb9b5f27f is described below
commit 0bb9b5f27f5e66807f835bf172c5a2bfe1334cb8
Author: Rong Rong <[email protected]>
AuthorDate: Sat Jun 18 17:47:19 2022 -0700
use single string dict and eliminate the integer key field (#8894)
---
.../connector/spark/connector/PinotUtilsTest.scala | 8 +-
.../apache/pinot/core/common/NullBitmapUtils.java | 49 ---
.../pinot/core/common/datablock/BaseDataBlock.java | 78 ++---
.../core/common/datablock/ColumnarDataBlock.java | 7 +-
.../core/common/datablock/DataBlockBuilder.java | 90 +++---
.../pinot/core/common/datablock/MetadataBlock.java | 3 +-
.../pinot/core/common/datablock/RowDataBlock.java | 7 +-
.../common/datatable/BaseDataTableBuilder.java | 162 ++++++++++
.../core/common/datatable/DataTableBuilder.java | 333 +++------------------
.../common/datatable/DataTableBuilderV2V3.java | 118 ++++++++
.../core/common/datatable/DataTableBuilderV4.java | 90 ++++++
.../core/common/datatable/DataTableFactory.java | 55 +++-
.../core/common/datatable/DataTableImplV2.java | 4 +-
.../core/common/datatable/DataTableImplV3.java | 4 +-
.../core/common/datatable/DataTableImplV4.java | 9 +-
.../core/common/datatable/DataTableUtils.java | 56 +---
.../StreamingInstanceResponseOperator.java | 4 +-
.../operator/blocks/IntermediateResultsBlock.java | 8 +-
.../pinot/core/query/distinct/DistinctTable.java | 3 +-
.../query/executor/ServerQueryExecutorV1Impl.java | 11 +-
.../pinot/core/query/scheduler/QueryScheduler.java | 6 +-
.../query/selection/SelectionOperatorUtils.java | 4 +-
.../core/transport/InstanceRequestHandler.java | 10 +-
.../pinot/core/common/datablock/DataBlockTest.java | 4 +-
.../core/common/datatable/DataTableSerDeTest.java | 44 +--
.../core/query/reduce/BrokerReduceServiceTest.java | 3 +-
.../query/scheduler/PrioritySchedulerTest.java | 3 +-
.../pinot/core/transport/QueryRoutingTest.java | 8 +-
.../tests/MultiStageEngineIntegrationTest.java | 6 +-
.../pinot/query/runtime/QueryRunnerTest.java | 3 +
.../server/starter/helix/BaseServerStarter.java | 4 +-
31 files changed, 627 insertions(+), 567 deletions(-)
diff --git
a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
index 24abdb367c..4929ebc122 100644
---
a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
+++
b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotUtilsTest.scala
@@ -18,12 +18,12 @@
*/
package org.apache.pinot.connector.spark.connector
-import org.apache.pinot.connector.spark.connector.PinotUtils._
import org.apache.pinot.common.utils.DataSchema
import org.apache.pinot.common.utils.DataSchema.ColumnDataType
import org.apache.pinot.connector.spark.BaseTest
+import org.apache.pinot.connector.spark.connector.PinotUtils._
import org.apache.pinot.connector.spark.exceptions.PinotException
-import org.apache.pinot.core.common.datatable.DataTableBuilder
+import org.apache.pinot.core.common.datatable.DataTableFactory
import org.apache.pinot.spi.data.Schema
import org.apache.pinot.spi.utils.ByteArray
import org.apache.spark.sql.catalyst.util.ArrayData
@@ -74,7 +74,7 @@ class PinotUtilsTest extends BaseTest {
)
val dataSchema = new DataSchema(columnNames, columnTypes)
- val dataTableBuilder = new DataTableBuilder(dataSchema)
+ val dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema)
dataTableBuilder.startRow()
dataTableBuilder.setColumn(0, "strValueDim")
dataTableBuilder.setColumn(1, 5)
@@ -140,7 +140,7 @@ class PinotUtilsTest extends BaseTest {
val columnTypes = Array(ColumnDataType.STRING, ColumnDataType.INT)
val dataSchema = new DataSchema(columnNames, columnTypes)
- val dataTableBuilder = new DataTableBuilder(dataSchema)
+ val dataTableBuilder = DataTableFactory.getDataTableBuilder(dataSchema)
dataTableBuilder.startRow()
dataTableBuilder.setColumn(0, "strValueDim")
dataTableBuilder.setColumn(1, 5)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
deleted file mode 100644
index f157d3274e..0000000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/NullBitmapUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.roaringbitmap.RoaringBitmap;
-
-
-public final class NullBitmapUtils {
- private NullBitmapUtils() {
- }
-
- public static void setNullRowIds(RoaringBitmap nullBitmap,
ByteArrayOutputStream fixedSizeByteArrayOutputStream,
- ByteArrayOutputStream variableSizeDataByteArrayOutputStream)
- throws IOException {
- writeInt(fixedSizeByteArrayOutputStream,
variableSizeDataByteArrayOutputStream.size());
- if (nullBitmap == null || nullBitmap.isEmpty()) {
- writeInt(fixedSizeByteArrayOutputStream, 0);
- } else {
- byte[] nullBitmapBytes =
ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullBitmap);
- writeInt(fixedSizeByteArrayOutputStream, nullBitmapBytes.length);
- variableSizeDataByteArrayOutputStream.write(nullBitmapBytes);
- }
- }
-
- private static void writeInt(ByteArrayOutputStream out, int value) {
- out.write((value >>> 24) & 0xFF);
- out.write((value >>> 16) & 0xFF);
- out.write((value >>> 8) & 0xFF);
- out.write(value & 0xFF);
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
index 04a0ad1ded..dff60efac9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/BaseDataBlock.java
@@ -85,7 +85,7 @@ public abstract class BaseDataBlock implements DataTable {
protected int _numRows;
protected int _numColumns;
protected DataSchema _dataSchema;
- protected Map<String, Map<Integer, String>> _dictionaryMap;
+ protected String[] _stringDictionary;
protected byte[] _fixedSizeDataBytes;
protected ByteBuffer _fixedSizeData;
protected byte[] _variableSizeDataBytes;
@@ -96,16 +96,16 @@ public abstract class BaseDataBlock implements DataTable {
* construct a base data block.
* @param numRows num of rows in the block
* @param dataSchema schema of the data in the block
- * @param dictionaryMap dictionary encoding map
+ * @param stringDictionary dictionary encoding map
* @param fixedSizeDataBytes byte[] for fix-sized columns.
* @param variableSizeDataBytes byte[] for variable length columns (arrays).
*/
- public BaseDataBlock(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
+ public BaseDataBlock(int numRows, DataSchema dataSchema, String[]
stringDictionary,
byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
_numRows = numRows;
_numColumns = dataSchema.size();
_dataSchema = dataSchema;
- _dictionaryMap = dictionaryMap;
+ _stringDictionary = stringDictionary;
_fixedSizeDataBytes = fixedSizeDataBytes;
_fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
_variableSizeDataBytes = variableSizeDataBytes;
@@ -121,7 +121,7 @@ public abstract class BaseDataBlock implements DataTable {
_numRows = 0;
_numColumns = 0;
_dataSchema = null;
- _dictionaryMap = null;
+ _stringDictionary = null;
_fixedSizeDataBytes = null;
_fixedSizeData = null;
_variableSizeDataBytes = null;
@@ -158,9 +158,9 @@ public abstract class BaseDataBlock implements DataTable {
// Read dictionary.
if (dictionaryMapLength != 0) {
byteBuffer.position(dictionaryMapStart);
- _dictionaryMap = deserializeDictionaryMap(byteBuffer);
+ _stringDictionary = deserializeStringDictionary(byteBuffer);
} else {
- _dictionaryMap = null;
+ _stringDictionary = null;
}
// Read data schema.
@@ -282,8 +282,7 @@ public abstract class BaseDataBlock implements DataTable {
@Override
public String getString(int rowId, int colId) {
positionCursorInFixSizedBuffer(rowId, colId);
- int dictId = _fixedSizeData.getInt();
- return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId);
+ return _stringDictionary[_fixedSizeData.getInt()];
}
@Override
@@ -355,9 +354,8 @@ public abstract class BaseDataBlock implements DataTable {
public String[] getStringArray(int rowId, int colId) {
int length = positionCursorInVariableBuffer(rowId, colId);
String[] strings = new String[length];
- Map<Integer, String> dictionary =
_dictionaryMap.get(_dataSchema.getColumnName(colId));
for (int i = 0; i < length; i++) {
- strings[i] = dictionary.get(_variableSizeData.getInt());
+ strings[i] = _stringDictionary[_variableSizeData.getInt()];
}
return strings;
}
@@ -369,26 +367,16 @@ public abstract class BaseDataBlock implements DataTable {
/**
* Helper method to serialize dictionary map.
*/
- protected byte[] serializeDictionaryMap()
+ protected byte[] serializeStringDictionary()
throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- dataOutputStream.writeInt(_dictionaryMap.size());
- for (Map.Entry<String, Map<Integer, String>> dictionaryMapEntry :
_dictionaryMap.entrySet()) {
- String columnName = dictionaryMapEntry.getKey();
- Map<Integer, String> dictionary = dictionaryMapEntry.getValue();
- byte[] bytes = columnName.getBytes(UTF_8);
- dataOutputStream.writeInt(bytes.length);
- dataOutputStream.write(bytes);
- dataOutputStream.writeInt(dictionary.size());
-
- for (Map.Entry<Integer, String> dictionaryEntry : dictionary.entrySet())
{
- dataOutputStream.writeInt(dictionaryEntry.getKey());
- byte[] valueBytes = dictionaryEntry.getValue().getBytes(UTF_8);
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- }
+ dataOutputStream.writeInt(_stringDictionary.length);
+ for (String entry : _stringDictionary) {
+ byte[] valueBytes = entry.getBytes(UTF_8);
+ dataOutputStream.writeInt(valueBytes.length);
+ dataOutputStream.write(valueBytes);
}
return byteArrayOutputStream.toByteArray();
@@ -397,24 +385,14 @@ public abstract class BaseDataBlock implements DataTable {
/**
* Helper method to deserialize dictionary map.
*/
- protected Map<String, Map<Integer, String>>
deserializeDictionaryMap(ByteBuffer buffer)
+ protected String[] deserializeStringDictionary(ByteBuffer buffer)
throws IOException {
- int numDictionaries = buffer.getInt();
- Map<String, Map<Integer, String>> dictionaryMap = new
HashMap<>(numDictionaries);
-
- for (int i = 0; i < numDictionaries; i++) {
- String column = DataTableUtils.decodeString(buffer);
- int dictionarySize = buffer.getInt();
- Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
- for (int j = 0; j < dictionarySize; j++) {
- int key = buffer.getInt();
- String value = DataTableUtils.decodeString(buffer);
- dictionary.put(key, value);
- }
- dictionaryMap.put(column, dictionary);
+ int dictionarySize = buffer.getInt();
+ String[] stringDictionary = new String[dictionarySize];
+ for (int i = 0; i < dictionarySize; i++) {
+ stringDictionary[i] = DataTableUtils.decodeString(buffer);
}
-
- return dictionaryMap;
+ return stringDictionary;
}
@Override
@@ -473,11 +451,11 @@ public abstract class BaseDataBlock implements DataTable {
// Write dictionary map section offset(START|SIZE).
dataOutputStream.writeInt(dataOffset);
- byte[] dictionaryMapBytes = null;
- if (_dictionaryMap != null) {
- dictionaryMapBytes = serializeDictionaryMap();
- dataOutputStream.writeInt(dictionaryMapBytes.length);
- dataOffset += dictionaryMapBytes.length;
+ byte[] dictionaryBytes = null;
+ if (_stringDictionary != null) {
+ dictionaryBytes = serializeStringDictionary();
+ dataOutputStream.writeInt(dictionaryBytes.length);
+ dataOffset += dictionaryBytes.length;
} else {
dataOutputStream.writeInt(0);
}
@@ -514,8 +492,8 @@ public abstract class BaseDataBlock implements DataTable {
// Write exceptions bytes.
dataOutputStream.write(exceptionsBytes);
// Write dictionary map bytes.
- if (dictionaryMapBytes != null) {
- dataOutputStream.write(dictionaryMapBytes);
+ if (dictionaryBytes != null) {
+ dataOutputStream.write(dictionaryBytes);
}
// Write data schema bytes.
if (dataSchemaBytes != null) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
index 1d2429f118..a83b8b2a30 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/ColumnarDataBlock.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
@@ -36,9 +35,9 @@ public class ColumnarDataBlock extends BaseDataBlock {
super();
}
- public ColumnarDataBlock(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
+ public ColumnarDataBlock(int numRows, DataSchema dataSchema, String[]
stringDictionary,
byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
- super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes,
variableSizeDataBytes);
+ super(numRows, dataSchema, stringDictionary, fixedSizeDataBytes,
variableSizeDataBytes);
computeBlockObjectConstants();
}
@@ -89,7 +88,7 @@ public class ColumnarDataBlock extends BaseDataBlock {
@Override
public ColumnarDataBlock toDataOnlyDataTable() {
- return new ColumnarDataBlock(_numRows, _dataSchema, _dictionaryMap,
_fixedSizeDataBytes, _variableSizeDataBytes);
+ return new ColumnarDataBlock(_numRows, _dataSchema, _stringDictionary,
_fixedSizeDataBytes, _variableSizeDataBytes);
}
// TODO: add whole-column access methods.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 8a14b491e9..788b68a91d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -18,17 +18,16 @@
*/
package org.apache.pinot.core.common.datablock;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.NullBitmapUtils;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.utils.ArrayCopyUtils;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -49,14 +48,13 @@ public class DataBlockBuilder {
private int _numRows;
private int _numColumns;
- private final Map<String, Map<String, Integer>> _dictionaryMap = new
HashMap<>();
- private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new
HashMap<>();
+ private final Object2IntOpenHashMap<String> _dictionary = new
Object2IntOpenHashMap<>();
private final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream =
new ByteArrayOutputStream();
+ private final DataOutputStream _fixedSizeDataOutputStream = new
DataOutputStream(_fixedSizeDataByteArrayOutputStream);
private final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream =
new ByteArrayOutputStream();
private final DataOutputStream _variableSizeDataOutputStream =
new DataOutputStream(_variableSizeDataByteArrayOutputStream);
-
private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type
blockType) {
_dataSchema = dataSchema;
_columnDataType = dataSchema.getStoredColumnDataTypes();
@@ -77,10 +75,16 @@ public class DataBlockBuilder {
}
}
- public void setNullRowIds(RoaringBitmap nullBitmap)
+ public void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
throws IOException {
- NullBitmapUtils.setNullRowIds(nullBitmap,
_fixedSizeDataByteArrayOutputStream,
- _variableSizeDataByteArrayOutputStream);
+
_fixedSizeDataOutputStream.writeInt(_variableSizeDataByteArrayOutputStream.size());
+ if (nullRowIds == null || nullRowIds.isEmpty()) {
+ _fixedSizeDataOutputStream.writeInt(0);
+ } else {
+ byte[] bitmapBytes =
ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullRowIds);
+ _fixedSizeDataOutputStream.writeInt(bitmapBytes.length);
+ _variableSizeDataByteArrayOutputStream.write(bitmapBytes);
+ }
}
public static RowDataBlock buildFromRows(List<Object[]> rows, @Nullable
RoaringBitmap[] colNullBitmaps,
@@ -110,7 +114,7 @@ public class DataBlockBuilder {
setColumn(rowBuilder, byteBuffer, (BigDecimal) value);
break;
case STRING:
- setColumn(rowBuilder, byteBuffer, i, (String) value);
+ setColumn(rowBuilder, byteBuffer, (String) value);
break;
case BYTES:
setColumn(rowBuilder, byteBuffer, (ByteArray) value);
@@ -165,12 +169,12 @@ public class DataBlockBuilder {
break;
case BYTES_ARRAY:
case STRING_ARRAY:
- setColumn(rowBuilder, byteBuffer, i, (String[]) value);
+ setColumn(rowBuilder, byteBuffer, (String[]) value);
break;
default:
- throw new IllegalStateException(String.format(
- "Unsupported data type: %s for column: %s",
rowBuilder._columnDataType[i],
- rowBuilder._dataSchema.getColumnName(i)));
+ throw new IllegalStateException(
+ String.format("Unsupported data type: %s for column: %s",
rowBuilder._columnDataType[i],
+ rowBuilder._dataSchema.getColumnName(i)));
}
}
rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(),
0, byteBuffer.position());
@@ -220,7 +224,7 @@ public class DataBlockBuilder {
break;
case STRING:
for (Object value : column) {
- setColumn(columnarBuilder, byteBuffer, i, (String) value);
+ setColumn(columnarBuilder, byteBuffer, (String) value);
}
break;
case BYTES:
@@ -289,13 +293,13 @@ public class DataBlockBuilder {
case BYTES_ARRAY:
case STRING_ARRAY:
for (Object value : column) {
- setColumn(columnarBuilder, byteBuffer, i, (String[]) value);
+ setColumn(columnarBuilder, byteBuffer, (String[]) value);
}
break;
default:
- throw new IllegalStateException(String.format(
- "Unsupported data type: %s for column: %s",
columnarBuilder._columnDataType[i],
- columnarBuilder._dataSchema.getColumnName(i)));
+ throw new IllegalStateException(
+ String.format("Unsupported data type: %s for column: %s",
columnarBuilder._columnDataType[i],
+ columnarBuilder._dataSchema.getColumnName(i)));
}
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(),
0, byteBuffer.position());
}
@@ -303,17 +307,25 @@ public class DataBlockBuilder {
}
private static RowDataBlock buildRowBlock(DataBlockBuilder builder) {
- return new RowDataBlock(builder._numRows, builder._dataSchema,
builder._reverseDictionaryMap,
+ return new RowDataBlock(builder._numRows, builder._dataSchema,
getReverseDictionary(builder._dictionary),
builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
builder._variableSizeDataByteArrayOutputStream.toByteArray());
}
private static ColumnarDataBlock buildColumnarBlock(DataBlockBuilder
builder) {
- return new ColumnarDataBlock(builder._numRows, builder._dataSchema,
builder._reverseDictionaryMap,
+ return new ColumnarDataBlock(builder._numRows, builder._dataSchema,
getReverseDictionary(builder._dictionary),
builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
builder._variableSizeDataByteArrayOutputStream.toByteArray());
}
+ private static String[] getReverseDictionary(Object2IntOpenHashMap<String>
dictionary) {
+ String[] reverseDictionary = new String[dictionary.size()];
+ for (Object2IntMap.Entry<String> entry : dictionary.object2IntEntrySet()) {
+ reverseDictionary[entry.getIntValue()] = entry.getKey();
+ }
+ return reverseDictionary;
+ }
+
private static void setColumn(DataBlockBuilder builder, ByteBuffer
byteBuffer, BigDecimal value)
throws IOException {
byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
@@ -322,20 +334,9 @@ public class DataBlockBuilder {
builder._variableSizeDataByteArrayOutputStream.write(bytes);
}
- private static void setColumn(DataBlockBuilder builder, ByteBuffer
byteBuffer, int colId, String value) {
- String columnName = builder._dataSchema.getColumnName(colId);
- Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName);
- if (dictionary == null) {
- dictionary = new HashMap<>();
- builder._dictionaryMap.put(columnName, dictionary);
- builder._reverseDictionaryMap.put(columnName, new HashMap<>());
- }
- Integer dictId = dictionary.get(value);
- if (dictId == null) {
- dictId = dictionary.size();
- dictionary.put(value, dictId);
- builder._reverseDictionaryMap.get(columnName).put(dictId, value);
- }
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer
byteBuffer, String value) {
+ Object2IntOpenHashMap<String> dictionary = builder._dictionary;
+ int dictId = dictionary.computeIntIfAbsent(value, k -> dictionary.size());
byteBuffer.putInt(dictId);
}
@@ -398,26 +399,13 @@ public class DataBlockBuilder {
}
}
- private static void setColumn(DataBlockBuilder builder, ByteBuffer
byteBuffer, int colId, String[] values)
+ private static void setColumn(DataBlockBuilder builder, ByteBuffer
byteBuffer, String[] values)
throws IOException {
byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size());
byteBuffer.putInt(values.length);
-
- String columnName = builder._dataSchema.getColumnName(colId);
- Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName);
- if (dictionary == null) {
- dictionary = new HashMap<>();
- builder._dictionaryMap.put(columnName, dictionary);
- builder._reverseDictionaryMap.put(columnName, new HashMap<>());
- }
-
+ Object2IntOpenHashMap<String> dictionary = builder._dictionary;
for (String value : values) {
- Integer dictId = dictionary.get(value);
- if (dictId == null) {
- dictId = dictionary.size();
- dictionary.put(value, dictId);
- builder._reverseDictionaryMap.get(columnName).put(dictId, value);
- }
+ int dictId = dictionary.computeIntIfAbsent(value, k ->
dictionary.size());
builder._variableSizeDataOutputStream.writeInt(dictId);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
index 06a637fc24..7ad53f6bf5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/MetadataBlock.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import org.apache.pinot.common.utils.DataSchema;
@@ -31,7 +30,7 @@ public class MetadataBlock extends BaseDataBlock {
private static final int VERSION = 1;
public MetadataBlock(DataSchema dataSchema) {
- super(0, dataSchema, Collections.emptyMap(), new byte[]{0}, new byte[]{0});
+ super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
}
public MetadataBlock(ByteBuffer byteBuffer)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
index eec72546f0..7946cc428d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/RowDataBlock.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.common.datablock;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.roaringbitmap.RoaringBitmap;
@@ -38,9 +37,9 @@ public class RowDataBlock extends BaseDataBlock {
super();
}
- public RowDataBlock(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
+ public RowDataBlock(int numRows, DataSchema dataSchema, String[]
stringDictionary,
byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
- super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes,
variableSizeDataBytes);
+ super(numRows, dataSchema, stringDictionary, fixedSizeDataBytes,
variableSizeDataBytes);
computeBlockObjectConstants();
}
@@ -108,7 +107,7 @@ public class RowDataBlock extends BaseDataBlock {
@Override
public RowDataBlock toDataOnlyDataTable() {
- return new RowDataBlock(_numRows, _dataSchema, _dictionaryMap,
_fixedSizeDataBytes, _variableSizeDataBytes);
+ return new RowDataBlock(_numRows, _dataSchema, _stringDictionary,
_fixedSizeDataBytes, _variableSizeDataBytes);
}
// TODO: add whole-row access methods.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
new file mode 100644
index 0000000000..a332adb381
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTableBuilder.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.datatable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+
+
+/**
+ * Base DataTableBuilder implementation.
+ */
+public abstract class BaseDataTableBuilder implements DataTableBuilder {
+ protected final DataSchema _dataSchema;
+ protected final int _version;
+ protected final int[] _columnOffsets;
+ protected final int _rowSizeInBytes;
+ protected final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream =
new ByteArrayOutputStream();
+ protected final DataOutputStream _fixedSizeDataOutputStream =
+ new DataOutputStream(_fixedSizeDataByteArrayOutputStream);
+ protected final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream
= new ByteArrayOutputStream();
+ protected final DataOutputStream _variableSizeDataOutputStream =
+ new DataOutputStream(_variableSizeDataByteArrayOutputStream);
+
+ protected int _numRows;
+ protected ByteBuffer _currentRowDataByteBuffer;
+
+ public BaseDataTableBuilder(DataSchema dataSchema, int version) {
+ _dataSchema = dataSchema;
+ _version = version;
+ _columnOffsets = new int[dataSchema.size()];
+ _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema,
_columnOffsets, _version);
+ }
+
+ @Override
+ public void startRow() {
+ _numRows++;
+ _currentRowDataByteBuffer = ByteBuffer.allocate(_rowSizeInBytes);
+ }
+
+ @Override
+ public void setColumn(int colId, int value) {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+ _currentRowDataByteBuffer.putInt(value);
+ }
+
+ @Override
+ public void setColumn(int colId, long value) {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+ _currentRowDataByteBuffer.putLong(value);
+ }
+
+ @Override
+ public void setColumn(int colId, float value) {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+ _currentRowDataByteBuffer.putFloat(value);
+ }
+
+ @Override
+ public void setColumn(int colId, double value) {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+ _currentRowDataByteBuffer.putDouble(value);
+ }
+
+ @Override
+ public void setColumn(int colId, BigDecimal value)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ byte[] bytes = BigDecimalUtils.serialize(value);
+ _currentRowDataByteBuffer.putInt(bytes.length);
+ _variableSizeDataByteArrayOutputStream.write(bytes);
+ }
+
+ @Override
+ public void setColumn(int colId, Object value)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ int objectTypeValue =
ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
+ if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
+ _currentRowDataByteBuffer.putInt(0);
+ _variableSizeDataOutputStream.writeInt(objectTypeValue);
+ } else {
+ byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
+ _currentRowDataByteBuffer.putInt(bytes.length);
+ _variableSizeDataOutputStream.writeInt(objectTypeValue);
+ _variableSizeDataByteArrayOutputStream.write(bytes);
+ }
+ }
+
+ @Override
+ public void setColumn(int colId, int[] values)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ _currentRowDataByteBuffer.putInt(values.length);
+ for (int value : values) {
+ _variableSizeDataOutputStream.writeInt(value);
+ }
+ }
+
+ @Override
+ public void setColumn(int colId, long[] values)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ _currentRowDataByteBuffer.putInt(values.length);
+ for (long value : values) {
+ _variableSizeDataOutputStream.writeLong(value);
+ }
+ }
+
+ @Override
+ public void setColumn(int colId, float[] values)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ _currentRowDataByteBuffer.putInt(values.length);
+ for (float value : values) {
+ _variableSizeDataOutputStream.writeFloat(value);
+ }
+ }
+
+ @Override
+ public void setColumn(int colId, double[] values)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ _currentRowDataByteBuffer.putInt(values.length);
+ for (double value : values) {
+ _variableSizeDataOutputStream.writeDouble(value);
+ }
+ }
+
+ @Override
+ public void finishRow()
+ throws IOException {
+
_fixedSizeDataByteArrayOutputStream.write(_currentRowDataByteBuffer.array());
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
index 941fabc2ee..b592c45c60 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilder.java
@@ -18,318 +18,79 @@
*/
package org.apache.pinot.core.common.datatable;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.pinot.common.utils.DataSchema;
+import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.NullBitmapUtils;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
+import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
/**
+ * DataTable holds data in a matrix form. The purpose of this interface is to
provide a way to construct a data table
+ * and ability to serialize and deserialize.
*
- * Datatable that holds data in a matrix form. The purpose of this class is to
- * provide a way to construct a datatable and ability to serialize and
- * deserialize.<br>
- * Why can't we use existing serialization/deserialization mechanism. Most
- * existing techniques protocol buffer, thrift, avro are optimized for
- * transporting a single record but Pinot transfers quite a lot of data from
- * server to broker during the scatter/gather operation. The cost of
- * serialization and deserialization directly impacts the performance. Most
- * ser/deser requires us to convert the primitives data types in objects like
- * Integer etc. This is waste of cpu resource and increase the payload size. We
- * optimize the data format for Pinot usecase. We can also support lazy
- * construction of obejcts. Infact we retain the bytes as it is and will be
able
- * to lookup the a field directly within a byte buffer.<br>
- *
- * USAGE:
- *
- * Datatable is initialized with the schema of the table. Schema describes the
- * columnnames, their order and data type for each column.<br>
- * Each row must follow the same convention. We don't support MultiValue
columns
- * for now. Format,
- * |VERSION,DATA_START_OFFSET,DICTIONARY_START_OFFSET,INDEX_START_OFFSET
- * ,METADATA_START_OFFSET | |<DATA> |
- *
- * |<DICTIONARY>|
- *
- *
- * |<METADATA>| Data contains the actual values written by the
application We
- * first write the entire data in its raw byte format. For example if you data
- * type is Int, it will write 4 bytes. For most data types that are fixed
width,
- * we just write the raw data. For special cases like String, we create a
- * dictionary. Dictionary will be never exposed to the user. All conversions
- * will be done internally. In future, we might decide dynamically if
dictionary
- * creation is needed, for now we will always create dictionaries for string
- * columns. During deserialization we will always load the dictionary
- * first.Overall having dictionary allow us to convert data table into a fixed
- * width matrix and thus allowing look up and easy traversal.
- *
- *
+ * <p>Why can't we use existing serialization/deserialization mechanism:
+ * <p>Most existing techniques (protocol buffer, thrift, avro) are optimized
for transporting a single record but Pinot
+ * transfers quite a lot of data from server to broker during the
scatter/gather operation. The cost of serialization
+ * and deserialization directly impacts the performance. Most ser/deser
requires us to convert the primitive data types
+ * into objects like Integer etc. This will waste cpu resource and increase
the payload size. We optimize the data
+ * format for Pinot use case. We can also support lazy construction of
objects. In fact we retain the bytes as it is and
+ * will be able to look up a field directly within a byte buffer.
*/
-// TODO: potential optimizations (DataTableV3 and below)
-// TODO: - Fix float size.
-// TODO: note: (Fixed in V4, remove this comment once V3 is deprecated)
-// TODO: - Given a data schema, write all values one by one instead of using
rowId and colId to position (save time).
-// TODO: note: (Fixed in V4, remove this comment once V3 is deprecated)
-// TODO: - Store bytes as variable size data instead of String
-// TODO: note: (Fixed in V4, remove this comment once V3 is deprecated)
-// TODO: - use builder factory pattern to for different version so that no
version check per build.
-// TODO: - Use one dictionary for all columns (save space).
-
-public class DataTableBuilder {
- public static final int VERSION_2 = 2;
- public static final int VERSION_3 = 3;
- public static final int VERSION_4 = 4;
- private static int _version = VERSION_3;
- private final DataSchema _dataSchema;
- private final int[] _columnOffsets;
- private final int _rowSizeInBytes;
- private final Map<String, Map<String, Integer>> _dictionaryMap = new
HashMap<>();
- private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new
HashMap<>();
- private final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream =
new ByteArrayOutputStream();
- private final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream =
new ByteArrayOutputStream();
- private final DataOutputStream _variableSizeDataOutputStream =
- new DataOutputStream(_variableSizeDataByteArrayOutputStream);
-
- private int _numRows;
- private ByteBuffer _currentRowDataByteBuffer;
-
- public DataTableBuilder(DataSchema dataSchema) {
- _dataSchema = dataSchema;
- _columnOffsets = new int[dataSchema.size()];
- _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema,
_columnOffsets, _version);
- }
-
- public static DataTable getEmptyDataTable() {
- switch (_version) {
- case VERSION_2:
- return new DataTableImplV2();
- case VERSION_3:
- return new DataTableImplV3();
- case VERSION_4:
- return new DataTableImplV4();
- default:
- throw new IllegalStateException("Unexpected value: " + _version);
- }
- }
-
- public static void setCurrentDataTableVersion(int version) {
- if (version != VERSION_2 && version != VERSION_3 && version != VERSION_4) {
- throw new IllegalArgumentException("Unsupported version: " + version);
- }
- _version = version;
- }
-
- public static int getCurrentDataTableVersion() {
- return _version;
- }
-
- public void startRow() {
- _numRows++;
- _currentRowDataByteBuffer = ByteBuffer.allocate(_rowSizeInBytes);
- }
-
- public void setNullRowIds(RoaringBitmap nullBitmap)
- throws IOException {
- assert _version >= VERSION_4;
- NullBitmapUtils.setNullRowIds(nullBitmap,
_fixedSizeDataByteArrayOutputStream,
- _variableSizeDataByteArrayOutputStream);
- }
-
- public void setColumn(int colId, boolean value) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- if (value) {
- _currentRowDataByteBuffer.put((byte) 1);
- } else {
- _currentRowDataByteBuffer.put((byte) 0);
- }
- }
-
- public void setColumn(int colId, byte value) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- _currentRowDataByteBuffer.put(value);
- }
-
- public void setColumn(int colId, char value) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- _currentRowDataByteBuffer.putChar(value);
- }
-
- public void setColumn(int colId, short value) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- _currentRowDataByteBuffer.putShort(value);
- }
[email protected]
[email protected]
+public interface DataTableBuilder {
- public void setColumn(int colId, int value) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- _currentRowDataByteBuffer.putInt(value);
- }
+ void startRow();
- public void setColumn(int colId, long value) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- _currentRowDataByteBuffer.putLong(value);
- }
+ void setColumn(int colId, int value);
- public void setColumn(int colId, float value) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- _currentRowDataByteBuffer.putFloat(value);
- }
+ void setColumn(int colId, long value);
- public void setColumn(int colId, double value) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- _currentRowDataByteBuffer.putDouble(value);
- }
+ void setColumn(int colId, float value);
- public void setColumn(int colId, BigDecimal value)
- throws IOException {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- byte[] bytes = BigDecimalUtils.serialize(value);
- _currentRowDataByteBuffer.putInt(bytes.length);
- _variableSizeDataByteArrayOutputStream.write(bytes);
- }
+ void setColumn(int colId, double value);
- public void setColumn(int colId, String value) {
- String columnName = _dataSchema.getColumnName(colId);
- Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
- if (dictionary == null) {
- dictionary = new HashMap<>();
- _dictionaryMap.put(columnName, dictionary);
- _reverseDictionaryMap.put(columnName, new HashMap<>());
- }
+ void setColumn(int colId, BigDecimal value)
+ throws IOException;
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
- Integer dictId = dictionary.get(value);
- if (dictId == null) {
- dictId = dictionary.size();
- dictionary.put(value, dictId);
- _reverseDictionaryMap.get(columnName).put(dictId, value);
- }
- _currentRowDataByteBuffer.putInt(dictId);
- }
+ void setColumn(int colId, String value);
- public void setColumn(int colId, ByteArray value)
- throws IOException {
- if (_version >= 4) {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- byte[] bytes = value.getBytes();
- _currentRowDataByteBuffer.putInt(bytes.length);
- _variableSizeDataByteArrayOutputStream.write(bytes);
- } else {
- // NOTE: Use String to store bytes value in DataTable V2 for
backward-compatibility
- setColumn(colId, value.toHexString());
- }
- }
+ void setColumn(int colId, ByteArray value)
+ throws IOException;
- public void setColumn(int colId, Object value)
- throws IOException {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- int objectTypeValue =
ObjectSerDeUtils.ObjectType.getObjectType(value).getValue();
- if (objectTypeValue == ObjectSerDeUtils.ObjectType.Null.getValue()) {
- _currentRowDataByteBuffer.putInt(0);
- _variableSizeDataOutputStream.writeInt(objectTypeValue);
- } else {
- byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue);
- _currentRowDataByteBuffer.putInt(bytes.length);
- _variableSizeDataOutputStream.writeInt(objectTypeValue);
- _variableSizeDataByteArrayOutputStream.write(bytes);
- }
- }
+ void setColumn(int colId, Object value)
+ throws IOException;
- public void setColumn(int colId, int[] values)
- throws IOException {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- _currentRowDataByteBuffer.putInt(values.length);
- for (int value : values) {
- _variableSizeDataOutputStream.writeInt(value);
- }
- }
+ void setColumn(int colId, int[] values)
+ throws IOException;
- public void setColumn(int colId, long[] values)
- throws IOException {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- _currentRowDataByteBuffer.putInt(values.length);
- for (long value : values) {
- _variableSizeDataOutputStream.writeLong(value);
- }
- }
+ void setColumn(int colId, long[] values)
+ throws IOException;
- public void setColumn(int colId, float[] values)
- throws IOException {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- _currentRowDataByteBuffer.putInt(values.length);
- for (float value : values) {
- _variableSizeDataOutputStream.writeFloat(value);
- }
- }
+ void setColumn(int colId, float[] values)
+ throws IOException;
- public void setColumn(int colId, double[] values)
- throws IOException {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- _currentRowDataByteBuffer.putInt(values.length);
- for (double value : values) {
- _variableSizeDataOutputStream.writeDouble(value);
- }
- }
+ void setColumn(int colId, double[] values)
+ throws IOException;
- public void setColumn(int colId, String[] values)
- throws IOException {
- _currentRowDataByteBuffer.position(_columnOffsets[colId]);
-
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
- _currentRowDataByteBuffer.putInt(values.length);
+ void setColumn(int colId, String[] values)
+ throws IOException;
- String columnName = _dataSchema.getColumnName(colId);
- Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
- if (dictionary == null) {
- dictionary = new HashMap<>();
- _dictionaryMap.put(columnName, dictionary);
- _reverseDictionaryMap.put(columnName, new HashMap<>());
- }
+ // TODO: Support MV BYTES
- for (String value : values) {
- Integer dictId = dictionary.get(value);
- if (dictId == null) {
- dictId = dictionary.size();
- dictionary.put(value, dictId);
- _reverseDictionaryMap.get(columnName).put(dictId, value);
- }
- _variableSizeDataOutputStream.writeInt(dictId);
- }
- }
+ void finishRow()
+ throws IOException;
- public void finishRow()
- throws IOException {
-
_fixedSizeDataByteArrayOutputStream.write(_currentRowDataByteBuffer.array());
- }
+ /**
+ * NOTE: When setting nullRowIds, we don't pass the colId currently, and
this method must be invoked for all columns.
+ * TODO: Revisit this
+ */
+ void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
+ throws IOException;
- public DataTable build() {
- switch (_version) {
- case VERSION_2:
- return new DataTableImplV2(_numRows, _dataSchema,
_reverseDictionaryMap,
- _fixedSizeDataByteArrayOutputStream.toByteArray(),
_variableSizeDataByteArrayOutputStream.toByteArray());
- case VERSION_3:
- return new DataTableImplV3(_numRows, _dataSchema,
_reverseDictionaryMap,
- _fixedSizeDataByteArrayOutputStream.toByteArray(),
_variableSizeDataByteArrayOutputStream.toByteArray());
- case VERSION_4:
- return new DataTableImplV4(_numRows, _dataSchema,
_reverseDictionaryMap,
- _fixedSizeDataByteArrayOutputStream.toByteArray(),
_variableSizeDataByteArrayOutputStream.toByteArray());
- default:
- throw new IllegalStateException("Unexpected value: " + _version);
- }
- }
+ DataTable build();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
new file mode 100644
index 0000000000..ea282bb559
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV2V3.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.datatable;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * Kept for backward compatible. Things improved in the newer versions:
+ * - Float size (should be 4 instead of 8)
+ * - Store bytes as variable size data instead of String
+ * - Use one dictionary for all columns (save space)
+ * - Support setting nullRowIds
+ */
+public class DataTableBuilderV2V3 extends BaseDataTableBuilder {
+ private final Map<String, Map<String, Integer>> _dictionaryMap = new
HashMap<>();
+ private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new
HashMap<>();
+
+ public DataTableBuilderV2V3(DataSchema dataSchema, int version) {
+ super(dataSchema, version);
+ Preconditions.checkArgument(version <= DataTableFactory.VERSION_3);
+ }
+
+ @Override
+ public void setColumn(int colId, String value) {
+ String columnName = _dataSchema.getColumnName(colId);
+ Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
+ if (dictionary == null) {
+ dictionary = new HashMap<>();
+ _dictionaryMap.put(columnName, dictionary);
+ _reverseDictionaryMap.put(columnName, new HashMap<>());
+ }
+
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+ Integer dictId = dictionary.get(value);
+ if (dictId == null) {
+ dictId = dictionary.size();
+ dictionary.put(value, dictId);
+ _reverseDictionaryMap.get(columnName).put(dictId, value);
+ }
+ _currentRowDataByteBuffer.putInt(dictId);
+ }
+
+ @Override
+ public void setColumn(int colId, ByteArray value)
+ throws IOException {
+ setColumn(colId, value.toHexString());
+ }
+
+ @Override
+ public void setColumn(int colId, String[] values)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ _currentRowDataByteBuffer.putInt(values.length);
+
+ String columnName = _dataSchema.getColumnName(colId);
+ Map<String, Integer> dictionary = _dictionaryMap.get(columnName);
+ if (dictionary == null) {
+ dictionary = new HashMap<>();
+ _dictionaryMap.put(columnName, dictionary);
+ _reverseDictionaryMap.put(columnName, new HashMap<>());
+ }
+
+ for (String value : values) {
+ Integer dictId = dictionary.get(value);
+ if (dictId == null) {
+ dictId = dictionary.size();
+ dictionary.put(value, dictId);
+ _reverseDictionaryMap.get(columnName).put(dictId, value);
+ }
+ _variableSizeDataOutputStream.writeInt(dictId);
+ }
+ }
+
+ @Override
+ public void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
+ throws IOException {
+ throw new UnsupportedOperationException("Not supported before DataTable
V4");
+ }
+
+ @Override
+ public DataTable build() {
+ byte[] fixedSizeDataBytes =
_fixedSizeDataByteArrayOutputStream.toByteArray();
+ byte[] variableSizeDataBytes =
_variableSizeDataByteArrayOutputStream.toByteArray();
+ if (_version == DataTableFactory.VERSION_2) {
+ return new DataTableImplV2(_numRows, _dataSchema, _reverseDictionaryMap,
fixedSizeDataBytes,
+ variableSizeDataBytes);
+ } else {
+ return new DataTableImplV3(_numRows, _dataSchema, _reverseDictionaryMap,
fixedSizeDataBytes,
+ variableSizeDataBytes);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
new file mode 100644
index 0000000000..8512024865
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderV4.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.datatable;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class DataTableBuilderV4 extends BaseDataTableBuilder {
+ private final Object2IntOpenHashMap<String> _dictionary = new
Object2IntOpenHashMap<>();
+
+ public DataTableBuilderV4(DataSchema dataSchema) {
+ super(dataSchema, DataTableFactory.VERSION_4);
+ }
+
+ @Override
+ public void setColumn(int colId, String value) {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+ int dictId = _dictionary.computeIntIfAbsent(value, k ->
_dictionary.size());
+ _currentRowDataByteBuffer.putInt(dictId);
+ }
+
+ @Override
+ public void setColumn(int colId, ByteArray value)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ byte[] bytes = value.getBytes();
+ _currentRowDataByteBuffer.putInt(bytes.length);
+ _variableSizeDataByteArrayOutputStream.write(bytes);
+ }
+
+ @Override
+ public void setColumn(int colId, String[] values)
+ throws IOException {
+ _currentRowDataByteBuffer.position(_columnOffsets[colId]);
+
_currentRowDataByteBuffer.putInt(_variableSizeDataByteArrayOutputStream.size());
+ _currentRowDataByteBuffer.putInt(values.length);
+ for (String value : values) {
+ int dictId = _dictionary.computeIntIfAbsent(value, k ->
_dictionary.size());
+ _variableSizeDataOutputStream.writeInt(dictId);
+ }
+ }
+
+ @Override
+ public void setNullRowIds(@Nullable RoaringBitmap nullRowIds)
+ throws IOException {
+
_fixedSizeDataOutputStream.writeInt(_variableSizeDataByteArrayOutputStream.size());
+ if (nullRowIds == null || nullRowIds.isEmpty()) {
+ _fixedSizeDataOutputStream.writeInt(0);
+ } else {
+ byte[] bitmapBytes =
ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(nullRowIds);
+ _fixedSizeDataOutputStream.writeInt(bitmapBytes.length);
+ _variableSizeDataByteArrayOutputStream.write(bitmapBytes);
+ }
+ }
+
+ @Override
+ public DataTable build() {
+ String[] reverseDictionary = new String[_dictionary.size()];
+ for (Object2IntMap.Entry<String> entry : _dictionary.object2IntEntrySet())
{
+ reverseDictionary[entry.getIntValue()] = entry.getKey();
+ }
+ return new DataTableImplV4(_numRows, _dataSchema, reverseDictionary,
+ _fixedSizeDataByteArrayOutputStream.toByteArray(),
_variableSizeDataByteArrayOutputStream.toByteArray());
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
index 4a39f110d9..cbe64b4aef 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableFactory.java
@@ -20,22 +20,71 @@ package org.apache.pinot.core.common.datatable;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DataTableFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataTableFactory.class);
+ public static final int VERSION_2 = 2;
+ public static final int VERSION_3 = 3;
+ public static final int VERSION_4 = 4;
+ public static final int DEFAULT_VERSION = VERSION_3;
+
+ private static int _version = DataTableFactory.DEFAULT_VERSION;
+
private DataTableFactory() {
}
+ public static int getDataTableVersion() {
+ return _version;
+ }
+
+ public static void setDataTableVersion(int version) {
+ LOGGER.info("Setting DataTable version to: " + version);
+ if (version != DataTableFactory.VERSION_2 && version !=
DataTableFactory.VERSION_3
+ && version != DataTableFactory.VERSION_4) {
+ throw new IllegalArgumentException("Unsupported version: " + version);
+ }
+ _version = version;
+ }
+
+ public static DataTable getEmptyDataTable() {
+ switch (_version) {
+ case VERSION_2:
+ return new DataTableImplV2();
+ case VERSION_3:
+ return new DataTableImplV3();
+ case VERSION_4:
+ return new DataTableImplV4();
+ default:
+ throw new IllegalStateException("Unexpected value: " + _version);
+ }
+ }
+
+ public static DataTableBuilder getDataTableBuilder(DataSchema dataSchema) {
+ switch (_version) {
+ case VERSION_2:
+ case VERSION_3:
+ return new DataTableBuilderV2V3(dataSchema, _version);
+ case VERSION_4:
+ return new DataTableBuilderV4(dataSchema);
+ default:
+ throw new UnsupportedOperationException("Unsupported data table
version: " + _version);
+ }
+ }
+
public static DataTable getDataTable(ByteBuffer byteBuffer)
throws IOException {
int version = byteBuffer.getInt();
switch (version) {
- case DataTableBuilder.VERSION_2:
+ case VERSION_2:
return new DataTableImplV2(byteBuffer);
- case DataTableBuilder.VERSION_3:
+ case VERSION_3:
return new DataTableImplV3(byteBuffer);
- case DataTableBuilder.VERSION_4:
+ case VERSION_4:
return new DataTableImplV4(byteBuffer);
default:
throw new UnsupportedOperationException("Unsupported data table
version: " + version);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
index af3665a48e..252d7303b2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
@@ -124,7 +124,7 @@ public class DataTableImplV2 extends BaseDataTable {
@Override
public int getVersion() {
- return DataTableBuilder.VERSION_2;
+ return DataTableFactory.VERSION_2;
}
private Map<String, String> deserializeMetadata(ByteBuffer buffer)
@@ -170,7 +170,7 @@ public class DataTableImplV2 extends BaseDataTable {
throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- dataOutputStream.writeInt(DataTableBuilder.VERSION_2);
+ dataOutputStream.writeInt(DataTableFactory.VERSION_2);
dataOutputStream.writeInt(_numRows);
dataOutputStream.writeInt(_numColumns);
int dataOffset = HEADER_SIZE;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
index edaf1d5479..433167808a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
@@ -166,7 +166,7 @@ public class DataTableImplV3 extends BaseDataTable {
@Override
public int getVersion() {
- return DataTableBuilder.VERSION_3;
+ return DataTableFactory.VERSION_3;
}
@Override
@@ -224,7 +224,7 @@ public class DataTableImplV3 extends BaseDataTable {
private void writeLeadingSections(DataOutputStream dataOutputStream)
throws IOException {
- dataOutputStream.writeInt(DataTableBuilder.VERSION_3);
+ dataOutputStream.writeInt(DataTableFactory.VERSION_3);
dataOutputStream.writeInt(_numRows);
dataOutputStream.writeInt(_numColumns);
int dataOffset = HEADER_SIZE;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
index fe32ac84a7..ba65453ec9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV4.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.common.datatable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Map;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datablock.RowDataBlock;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -42,13 +41,13 @@ public class DataTableImplV4 extends RowDataBlock {
super(byteBuffer);
}
- public DataTableImplV4(int numRows, DataSchema dataSchema, Map<String,
Map<Integer, String>> dictionaryMap,
- byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) {
- super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes,
variableSizeDataBytes);
+ public DataTableImplV4(int numRows, DataSchema dataSchema, String[]
dictionary, byte[] fixedSizeDataBytes,
+ byte[] variableSizeDataBytes) {
+ super(numRows, dataSchema, dictionary, fixedSizeDataBytes,
variableSizeDataBytes);
}
@Override
protected int getDataBlockVersionType() {
- return DataTableBuilder.VERSION_4;
+ return DataTableFactory.VERSION_4;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
index 1430869f95..a7b8652229 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
@@ -18,9 +18,6 @@
*/
package org.apache.pinot.core.common.datatable;
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -72,7 +69,7 @@ public class DataTableUtils {
rowSizeInBytes += 8;
break;
case FLOAT:
- if (dataTableVersion >= 4) {
+ if (dataTableVersion >= DataTableFactory.VERSION_4) {
rowSizeInBytes += 4;
} else {
rowSizeInBytes += 8;
@@ -123,7 +120,7 @@ public class DataTableUtils {
// NOTE: Use STRING column data type as default for selection query
Arrays.fill(columnDataTypes, ColumnDataType.STRING);
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- return new DataTableBuilder(dataSchema).build();
+ return DataTableFactory.getDataTableBuilder(dataSchema).build();
}
/**
@@ -154,7 +151,7 @@ public class DataTableUtils {
columnDataTypes[index] =
aggregationFunction.getIntermediateResultColumnType();
index++;
}
- return new DataTableBuilder(new DataSchema(columnNames,
columnDataTypes)).build();
+ return DataTableFactory.getDataTableBuilder(new DataSchema(columnNames,
columnDataTypes)).build();
} else {
// Aggregation only query
@@ -171,7 +168,8 @@ public class DataTableUtils {
}
// Build the data table
- DataTableBuilder dataTableBuilder = new DataTableBuilder(new
DataSchema(aggregationColumnNames, columnDataTypes));
+ DataTableBuilder dataTableBuilder =
+ DataTableFactory.getDataTableBuilder(new
DataSchema(aggregationColumnNames, columnDataTypes));
dataTableBuilder.startRow();
for (int i = 0; i < numAggregations; i++) {
switch (columnDataTypes[i]) {
@@ -214,7 +212,7 @@ public class DataTableUtils {
new DistinctTable(new DataSchema(columnNames, columnDataTypes),
Collections.emptySet());
// Build the data table
- DataTableBuilder dataTableBuilder = new DataTableBuilder(
+ DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
new DataSchema(new
String[]{distinctAggregationFunction.getColumnName()},
new ColumnDataType[]{ColumnDataType.OBJECT}));
dataTableBuilder.startRow();
@@ -223,22 +221,6 @@ public class DataTableUtils {
return dataTableBuilder.build();
}
- /**
- * Helper method to decode string.
- */
- public static String decodeString(DataInputStream dataInputStream)
- throws IOException {
- int length = dataInputStream.readInt();
- if (length == 0) {
- return StringUtils.EMPTY;
- } else {
- byte[] buffer = new byte[length];
- int numBytesRead = dataInputStream.read(buffer);
- assert numBytesRead == length;
- return new String(buffer, UTF_8);
- }
- }
-
/**
* Helper method to decode string.
*/
@@ -246,35 +228,11 @@ public class DataTableUtils {
throws IOException {
int length = buffer.getInt();
if (length == 0) {
- return "";
+ return StringUtils.EMPTY;
} else {
byte[] bytes = new byte[length];
buffer.get(bytes);
return new String(bytes, UTF_8);
}
}
-
- /**
- * Helper method to decode int.
- */
- public static int decodeInt(DataInputStream dataInputStream)
- throws IOException {
- int length = Integer.BYTES;
- byte[] buffer = new byte[length];
- int numBytesRead = dataInputStream.read(buffer);
- assert numBytesRead == length;
- return Ints.fromByteArray(buffer);
- }
-
- /**
- * Helper method to decode long.
- */
- public static long decodeLong(DataInputStream dataInputStream)
- throws IOException {
- int length = Long.BYTES;
- byte[] buffer = new byte[length];
- int numBytesRead = dataInputStream.read(buffer);
- assert numBytesRead == length;
- return Longs.fromByteArray(buffer);
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
index a2976eb2c1..0b00b90902 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.combine.BaseCombineOperator;
import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
@@ -53,7 +53,7 @@ public class StreamingInstanceResponseOperator extends
InstanceResponseOperator
instanceResponseDataTable.toDataOnlyDataTable()));
} catch (IOException e) {
// when exception occurs in streaming, we return an error-only metadata
block.
- metadataOnlyDataTable = DataTableBuilder.getEmptyDataTable();
+ metadataOnlyDataTable = DataTableFactory.getEmptyDataTable();
metadataOnlyDataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
}
// return a metadata-only block.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 13cd4151a8..967360453d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.BlockDocIdValueSet;
import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
@@ -308,7 +309,7 @@ public class IntermediateResultsBlock implements Block {
private DataTable getResultDataTable()
throws IOException {
- DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
+ DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(_dataSchema);
ColumnDataType[] storedColumnDataTypes =
_dataSchema.getStoredColumnDataTypes();
Iterator<Record> iterator = _table.iterator();
while (iterator.hasNext()) {
@@ -391,7 +392,8 @@ public class IntermediateResultsBlock implements Block {
}
// Build the data table.
- DataTableBuilder dataTableBuilder = new DataTableBuilder(new
DataSchema(columnNames, columnDataTypes));
+ DataTableBuilder dataTableBuilder =
+ DataTableFactory.getDataTableBuilder(new DataSchema(columnNames,
columnDataTypes));
dataTableBuilder.startRow();
for (int i = 0; i < numAggregationFunctions; i++) {
switch (columnDataTypes[i]) {
@@ -416,7 +418,7 @@ public class IntermediateResultsBlock implements Block {
}
private DataTable getMetadataDataTable() {
- return attachMetadataToDataTable(DataTableBuilder.getEmptyDataTable());
+ return attachMetadataToDataTable(DataTableFactory.getEmptyDataTable());
}
private DataTable attachMetadataToDataTable(DataTable dataTable) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
index 371e1a0519..0fe04313cf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
@@ -239,7 +239,8 @@ public class DistinctTable {
public byte[] toBytes()
throws IOException {
// NOTE: Serialize the DistinctTable as a DataTable
- DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
+ DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
+ _dataSchema);
ColumnDataType[] storedColumnDataTypes =
_dataSchema.getStoredColumnDataTypes();
int numColumns = storedColumnDataTypes.length;
for (Record record : _records) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index e6eb711ab6..e0c086f5c3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -47,6 +47,7 @@ import org.apache.pinot.core.common.ExplainPlanRowData;
import org.apache.pinot.core.common.ExplainPlanRows;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
@@ -168,7 +169,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
String errorMessage =
String.format("Query scheduling took %dms (longer than query timeout
of %dms) on server: %s",
querySchedulingTimeMs, queryTimeoutMs,
_instanceDataManager.getInstanceId());
- DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+ DataTable dataTable = DataTableFactory.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
errorMessage));
LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
return dataTable;
@@ -178,7 +179,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
if (tableDataManager == null) {
String errorMessage = String.format("Failed to find table: %s on server:
%s", tableNameWithType,
_instanceDataManager.getInstanceId());
- DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+ DataTable dataTable = DataTableFactory.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
errorMessage));
LOGGER.error("{} while processing requestId: {}", errorMessage,
requestId);
return dataTable;
@@ -226,7 +227,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
LOGGER.error("Exception processing requestId {}", requestId, e);
}
- dataTable = DataTableBuilder.getEmptyDataTable();
+ dataTable = DataTableFactory.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
} finally {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
@@ -334,7 +335,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
/** @return EXPLAIN_PLAN query result {@link DataTable} when no segments get
selected for query execution.*/
private static DataTable getExplainPlanResultsForNoMatchingSegment(int
totalNumSegments) {
- DataTableBuilder dataTableBuilder = new
DataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+ DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
try {
dataTableBuilder.startRow();
dataTableBuilder.setColumn(0,
String.format(ExplainPlanRows.PLAN_START_FORMAT,
@@ -440,7 +441,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
/** @return EXPLAIN PLAN query result {@link DataTable}. */
public static DataTable processExplainPlanQueries(Plan queryPlan) {
- DataTableBuilder dataTableBuilder = new
DataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+ DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
List<Operator> childOperators =
queryPlan.getPlanNode().run().getChildOperators();
assert childOperators.size() == 1;
Operator root = childOperators.get(0);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 3009a96019..7ca4485ca4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.TimerContext;
@@ -154,7 +154,7 @@ public abstract class QueryScheduler {
queryRequest.getBrokerId(), e);
// For not handled exceptions
_serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
- dataTable = DataTableBuilder.getEmptyDataTable();
+ dataTable = DataTableFactory.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.INTERNAL_ERROR,
e));
}
long requestId = queryRequest.getRequestId();
@@ -320,7 +320,7 @@ public abstract class QueryScheduler {
*/
protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest
queryRequest,
ProcessingException error) {
- DataTable result = DataTableBuilder.getEmptyDataTable();
+ DataTable result = DataTableFactory.getEmptyDataTable();
Map<String, String> dataTableMetadata = result.getMetadata();
dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(queryRequest.getRequestId()));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 1b7cac34cd..a4ffd2a7f8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.utils.ArrayCopyUtils;
@@ -234,7 +235,8 @@ public class SelectionOperatorUtils {
ColumnDataType[] storedColumnDataTypes =
dataSchema.getStoredColumnDataTypes();
int numColumns = storedColumnDataTypes.length;
- DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilder = DataTableFactory.getDataTableBuilder(
+ dataSchema);
for (Object[] row : rows) {
dataTableBuilder.startRow();
for (int i = 0; i < numColumns; i++) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index a93137698f..af841eea5b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -37,7 +37,7 @@ import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.server.access.AccessControl;
@@ -133,7 +133,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
String hexString = requestBytes != null ?
BytesUtils.toHexString(requestBytes) : "";
long reqestId = instanceRequest != null ? instanceRequest.getRequestId()
: 0;
LOGGER.error("Exception while processing instance request: {}",
hexString, e);
- sendErrorResponse(ctx, reqestId, tableNameWithType, queryArrivalTimeMs,
DataTableBuilder.getEmptyDataTable(), e);
+ sendErrorResponse(ctx, reqestId, tableNameWithType, queryArrivalTimeMs,
DataTableFactory.getEmptyDataTable(), e);
}
}
@@ -148,7 +148,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
} else {
// Send exception response.
sendErrorResponse(ctx, queryRequest.getRequestId(),
tableNameWithType, queryArrivalTimeMs,
- DataTableBuilder.getEmptyDataTable(), new Exception("Null query
response."));
+ DataTableFactory.getEmptyDataTable(), new Exception("Null query
response."));
}
}
@@ -157,7 +157,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
// Send exception response.
LOGGER.error("Exception while processing instance request", t);
sendErrorResponse(ctx, instanceRequest.getRequestId(),
tableNameWithType, queryArrivalTimeMs,
- DataTableBuilder.getEmptyDataTable(), new Exception(t));
+ DataTableFactory.getEmptyDataTable(), new Exception(t));
}
};
}
@@ -168,7 +168,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
// will only be called if for some remote reason we are unable to handle
exceptions in channelRead0.
String message = "Unhandled Exception in " + getClass().getCanonicalName();
LOGGER.error(message, cause);
- sendErrorResponse(ctx, 0, null, System.currentTimeMillis(),
DataTableBuilder.getEmptyDataTable(),
+ sendErrorResponse(ctx, 0, null, System.currentTimeMillis(),
DataTableFactory.getEmptyDataTable(),
new Exception(message, cause));
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
index d7ec9a7bfa..b9e816374c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -81,7 +81,7 @@ public class DataBlockTest {
DataSchema dataSchema = new DataSchema(columnNames.toArray(new String[0]),
columnDataTypes.toArray(new DataSchema.ColumnDataType[0]));
List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema,
TEST_ROW_COUNT);
- DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
DataTable dataTableImpl =
SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema);
DataTable dataBlockFromDataTable =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTableImpl.toBytes()));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index fbe9d98ed9..4bc037848c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -106,7 +106,7 @@ public class DataTableSerDeTest {
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
exception);
String expected = processingException.getMessage();
- DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+ DataTable dataTable = DataTableFactory.getEmptyDataTable();
dataTable.addException(processingException);
DataTable newDataTable =
DataTableFactory.getDataTable(dataTable.toBytes());
Assert.assertNull(newDataTable.getDataSchema());
@@ -124,7 +124,7 @@ public class DataTableSerDeTest {
DataSchema dataSchema = new DataSchema(new String[]{"SV", "MV"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING,
DataSchema.ColumnDataType.STRING_ARRAY});
- DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(dataSchema);
for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
dataTableBuilder.startRow();
dataTableBuilder.setColumn(0, emptyString);
@@ -154,7 +154,7 @@ public class DataTableSerDeTest {
}
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
DataTable dataTable = dataTableBuilder.build();
@@ -181,8 +181,8 @@ public class DataTableSerDeTest {
// Verify V4 broker can deserialize data table (has data, but has no
metadata) send by V3 server
ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
- DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
- DataTableBuilder dataTableBuilderV3WithDataOnly = new
DataTableBuilder(dataSchema);
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+ DataTableBuilder dataTableBuilderV3WithDataOnly =
DataTableFactory.getDataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly,
columnDataTypes, numColumns);
DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create
a V3 data table
@@ -204,7 +204,7 @@ public class DataTableSerDeTest {
Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
// Verify V4 broker can deserialize data table (only has metadata) send by
V3 server
- DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new
DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilderV3WithMetadataDataOnly =
DataTableFactory.getDataTableBuilder(dataSchema);
dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a
V3 data table
for (String key : EXPECTED_METADATA.keySet()) {
dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -217,8 +217,8 @@ public class DataTableSerDeTest {
// Verify V4 broker can deserialize (has data, but has no metadata) send
by V4 server(with ThreadCpuTimeMeasurement
// disabled)
ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
- DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
- DataTableBuilder dataTableBuilderV4WithDataOnly = new
DataTableBuilder(dataSchema);
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+ DataTableBuilder dataTableBuilderV4WithDataOnly =
DataTableFactory.getDataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly,
columnDataTypes, numColumns);
DataTable dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create
a V4 data table
// Deserialize data table bytes as V4
@@ -242,7 +242,7 @@ public class DataTableSerDeTest {
// Verify V4 broker can deserialize data table (only has metadata) send by
V4 server(with
// ThreadCpuTimeMeasurement disabled)
- DataTableBuilder dataTableBuilderV4WithMetadataDataOnly = new
DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilderV4WithMetadataDataOnly =
DataTableFactory.getDataTableBuilder(dataSchema);
dataTableV4 = dataTableBuilderV4WithMetadataDataOnly.build(); // create a
V4 data table
for (String key : EXPECTED_METADATA.keySet()) {
dataTableV4.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -255,7 +255,7 @@ public class DataTableSerDeTest {
// Verify V4 broker can deserialize (has data, but has no metadata) send
by V4 server(with ThreadCpuTimeMeasurement
// enabled)
ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
- DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_4);
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data
table
// Deserialize data table bytes as V4
newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes());
@@ -311,8 +311,8 @@ public class DataTableSerDeTest {
// Verify V3 broker can deserialize data table (has data, but has no
metadata) send by V2 server
ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
- DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_2);
- DataTableBuilder dataTableBuilderV2WithDataOnly = new
DataTableBuilder(dataSchema);
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_2);
+ DataTableBuilder dataTableBuilderV2WithDataOnly =
DataTableFactory.getDataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly,
columnDataTypes, numColumns);
DataTable dataTableV2 = dataTableBuilderV2WithDataOnly.build(); // create
a V2 data table
@@ -334,7 +334,7 @@ public class DataTableSerDeTest {
Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
// Verify V3 broker can deserialize data table (only has metadata) send by
V2 server
- DataTableBuilder dataTableBuilderV2WithMetadataDataOnly = new
DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilderV2WithMetadataDataOnly =
DataTableFactory.getDataTableBuilder(dataSchema);
dataTableV2 = dataTableBuilderV2WithMetadataDataOnly.build(); // create a
V2 data table
for (String key : EXPECTED_METADATA.keySet()) {
dataTableV2.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -347,8 +347,8 @@ public class DataTableSerDeTest {
// Verify V3 broker can deserialize (has data, but has no metadata) send
by V3 server(with ThreadCpuTimeMeasurement
// disabled)
ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
- DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
- DataTableBuilder dataTableBuilderV3WithDataOnly = new
DataTableBuilder(dataSchema);
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+ DataTableBuilder dataTableBuilderV3WithDataOnly =
DataTableFactory.getDataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly,
columnDataTypes, numColumns);
DataTable dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create
a V3 data table
// Deserialize data table bytes as V3
@@ -372,7 +372,7 @@ public class DataTableSerDeTest {
// Verify V3 broker can deserialize data table (only has metadata) send by
V3 server(with
// ThreadCpuTimeMeasurement disabled)
- DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new
DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilderV3WithMetadataDataOnly =
DataTableFactory.getDataTableBuilder(dataSchema);
dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a
V3 data table
for (String key : EXPECTED_METADATA.keySet()) {
dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
@@ -385,7 +385,7 @@ public class DataTableSerDeTest {
// Verify V3 broker can deserialize (has data, but has no metadata) send
by V3 server(with ThreadCpuTimeMeasurement
// enabled)
ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
- DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data
table
// Deserialize data table bytes as V3
newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
@@ -438,7 +438,7 @@ public class DataTableSerDeTest {
}
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
DataTable dataTable = dataTableBuilder.build();
@@ -474,8 +474,8 @@ public class DataTableSerDeTest {
ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
- DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_3);
+ DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
DataTable dataTable = dataTableBuilder.build();
@@ -486,7 +486,7 @@ public class DataTableSerDeTest {
ByteBuffer byteBuffer = ByteBuffer.wrap(dataTable.toBytes());
int version = byteBuffer.getInt();
- Assert.assertEquals(version, DataTableBuilder.VERSION_3);
+ Assert.assertEquals(version, DataTableFactory.VERSION_3);
byteBuffer.getInt(); // numOfRows
byteBuffer.getInt(); // numOfColumns
byteBuffer.getInt(); // exceptionsStart
@@ -542,7 +542,7 @@ public class DataTableSerDeTest {
DataSchema.ColumnDataType[] columnDataTypes, int numColumns)
throws IOException {
RoaringBitmap[] nullBitmaps = null;
- if (DataTableBuilder.getCurrentDataTableVersion() >=
DataTableBuilder.VERSION_4) {
+ if (DataTableFactory.getDataTableVersion() >= DataTableFactory.VERSION_4) {
nullBitmaps = new RoaringBitmap[numColumns];
for (int colId = 0; colId < numColumns; colId++) {
nullBitmaps[colId] = new RoaringBitmap();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
index 8f2be65532..2db3a4cc01 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -53,7 +54,7 @@ public class BrokerReduceServiceTest {
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM
testTable GROUP BY col1");
DataSchema dataSchema =
new DataSchema(new String[]{"col1", "count(*)"}, new
ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
- DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ DataTableBuilder dataTableBuilder =
DataTableFactory.getDataTableBuilder(dataSchema);
int numGroups = 5000;
for (int i = 0; i < numGroups; i++) {
dataTableBuilder.startRow();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index bd2a1fb164..927b9cc7f6 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -43,7 +43,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
@@ -306,7 +305,7 @@ public class PrioritySchedulerTest {
throw new RuntimeException(e);
}
}
- DataTable result = DataTableBuilder.getEmptyDataTable();
+ DataTable result = DataTableFactory.getEmptyDataTable();
result.getMetadata().put(MetadataKey.TABLE.getName(),
queryRequest.getTableNameWithType());
if (_useBarrier) {
try {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 7639e50d30..01191ce79d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
@@ -81,7 +81,7 @@ public class QueryRoutingTest {
public void testValidResponse()
throws Exception {
long requestId = 123;
- DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+ DataTable dataTable = DataTableFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
@@ -159,7 +159,7 @@ public class QueryRoutingTest {
public void testNonMatchingRequestId()
throws Exception {
long requestId = 123;
- DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+ DataTable dataTable = DataTableFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
@@ -192,7 +192,7 @@ public class QueryRoutingTest {
// To avoid flakyness, set timeoutMs to 2000 msec. For some test runs, it
can take up to
// 1400 msec to mark request as failed.
long timeoutMs = 2000L;
- DataTable dataTable = DataTableBuilder.getEmptyDataTable();
+ DataTable dataTable = DataTableFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 8bbf28e1ee..5d149de4ed 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -24,7 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -82,7 +82,7 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTest
waitForAllDocsLoaded(600_000L);
// Setting data table version to 4
- DataTableBuilder.setCurrentDataTableVersion(4);
+ DataTableFactory.setDataTableVersion(4);
}
@Override
@@ -125,7 +125,7 @@ public class MultiStageEngineIntegrationTest extends
BaseClusterIntegrationTest
public void tearDown()
throws Exception {
// Setting data table version to 4
- DataTableBuilder.setCurrentDataTableVersion(3);
+ DataTableFactory.setDataTableVersion(3);
dropOfflineTable(DEFAULT_TABLE_NAME);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index f76917b766..4ef6b763f0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestUtils;
@@ -67,6 +68,7 @@ public class QueryRunnerTest {
@BeforeClass
public void setUp()
throws Exception {
+ DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
QueryServerEnclosure server1 = new
QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c",
INDEX_DIR_S1_C),
QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
@@ -95,6 +97,7 @@ public class QueryRunnerTest {
@AfterClass
public void tearDown() {
+ DataTableFactory.setDataTableVersion(DataTableFactory.DEFAULT_VERSION);
for (QueryServerEnclosure server : _servers.values()) {
server.shutDown();
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 4fa01c9110..8f00c56bfb 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -61,7 +61,7 @@ import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
import org.apache.pinot.core.query.request.context.ThreadTimer;
@@ -186,7 +186,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
throw new UnsupportedOperationException("Setting experimental DataTable
version newer than default via config "
+ "is not allowed. Current default DataTable version: " +
Server.DEFAULT_CURRENT_DATA_TABLE_VERSION);
}
- DataTableBuilder.setCurrentDataTableVersion(dataTableVersion);
+ DataTableFactory.setDataTableVersion(dataTableVersion);
LOGGER.info("Initializing Helix manager with zkAddress: {}, clusterName:
{}, instanceId: {}", _zkAddress,
_helixClusterName, _instanceId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]