This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 95bda21 Remove all V2 metadata string keys, use V3 metadata enum keys
instead (#6742)
95bda21 is described below
commit 95bda214b81544e56cc5f313b034d667333c90bc
Author: Liang Mingqiang <[email protected]>
AuthorDate: Fri Apr 16 12:32:06 2021 -0700
Remove all V2 metadata string keys, use V3 metadata enum keys instead
(#6742)
Remove all V2 metadata string keys (except the "Exception" one, which is
used in V2 to construct exception metadata keys), use V3 metadata enum keys
instead.
---
.../org/apache/pinot/common/utils/DataTable.java | 16 +------------
.../operator/blocks/IntermediateResultsBlock.java | 19 ++++++++-------
.../query/executor/ServerQueryExecutorV1Impl.java | 25 +++++++++----------
.../core/query/reduce/BrokerReduceService.java | 25 +++++++++----------
.../pinot/core/query/scheduler/QueryScheduler.java | 28 +++++++++++-----------
.../core/transport/InstanceRequestHandler.java | 3 ++-
.../apache/pinot/core/transport/QueryRouter.java | 13 ++++++----
.../pinot/core/transport/QueryRoutingTest.java | 7 +++---
.../tests/OfflineClusterIntegrationTest.java | 7 +++---
...PartitionLLCRealtimeClusterIntegrationTest.java | 26 ++++++++++----------
10 files changed, 82 insertions(+), 87 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index b699b02..0e62948 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -30,22 +30,8 @@ import org.apache.pinot.spi.utils.ByteArray;
* Data table is used to transfer data from server to broker.
*/
public interface DataTable {
+ // TODO: remove this when we stop supporting DataTable V2.
String EXCEPTION_METADATA_KEY = "Exception";
- String NUM_DOCS_SCANNED_METADATA_KEY = "numDocsScanned";
- String NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY =
"numEntriesScannedInFilter";
- String NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY =
"numEntriesScannedPostFilter";
- String NUM_SEGMENTS_QUERIED = "numSegmentsQueried";
- String NUM_SEGMENTS_PROCESSED = "numSegmentsProcessed";
- String NUM_SEGMENTS_MATCHED = "numSegmentsMatched";
- String NUM_CONSUMING_SEGMENTS_PROCESSED = "numConsumingSegmentsProcessed";
- String MIN_CONSUMING_FRESHNESS_TIME_MS = "minConsumingFreshnessTimeMs";
- String TOTAL_DOCS_METADATA_KEY = "totalDocs";
- String NUM_GROUPS_LIMIT_REACHED_KEY = "numGroupsLimitReached";
- String TIME_USED_MS_METADATA_KEY = "timeUsedMs";
- String TRACE_INFO_METADATA_KEY = "traceInfo";
- String REQUEST_ID_METADATA_KEY = "requestId";
- String NUM_RESIZES_METADATA_KEY = "numResizes";
- String RESIZE_TIME_MS_METADATA_KEY = "resizeTimeMs";
void addException(ProcessingException processingException);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 3886e27..c4f245e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.BlockDocIdSet;
import org.apache.pinot.core.common.BlockDocIdValueSet;
@@ -426,19 +427,19 @@ public class IntermediateResultsBlock implements Block {
}
private DataTable attachMetadataToDataTable(DataTable dataTable) {
- dataTable.getMetadata().put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY,
String.valueOf(_numDocsScanned));
+ dataTable.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(),
String.valueOf(_numDocsScanned));
dataTable.getMetadata()
- .put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY,
String.valueOf(_numEntriesScannedInFilter));
+ .put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
String.valueOf(_numEntriesScannedInFilter));
dataTable.getMetadata()
- .put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
String.valueOf(_numEntriesScannedPostFilter));
- dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_PROCESSED,
String.valueOf(_numSegmentsProcessed));
- dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_MATCHED,
String.valueOf(_numSegmentsMatched));
- dataTable.getMetadata().put(DataTable.NUM_RESIZES_METADATA_KEY,
String.valueOf(_numResizes));
- dataTable.getMetadata().put(DataTable.RESIZE_TIME_MS_METADATA_KEY,
String.valueOf(_resizeTimeMs));
+ .put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
String.valueOf(_numEntriesScannedPostFilter));
+ dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(),
String.valueOf(_numSegmentsProcessed));
+ dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(),
String.valueOf(_numSegmentsMatched));
+ dataTable.getMetadata().put(MetadataKey.NUM_RESIZES.getName(),
String.valueOf(_numResizes));
+ dataTable.getMetadata().put(MetadataKey.RESIZE_TIME_MS.getName(),
String.valueOf(_resizeTimeMs));
- dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(_numTotalDocs));
+ dataTable.getMetadata().put(MetadataKey.TOTAL_DOCS.getName(),
String.valueOf(_numTotalDocs));
if (_numGroupsLimitReached) {
- dataTable.getMetadata().put(DataTable.NUM_GROUPS_LIMIT_REACHED_KEY,
"true");
+
dataTable.getMetadata().put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(),
"true");
}
if (_processingExceptions != null && _processingExceptions.size() > 0) {
for (ProcessingException exception : _processingExceptions) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 1718456..9a8ad4d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -38,6 +38,7 @@ import
org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -232,7 +233,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
if (enableTrace) {
if (dataTable != null) {
- dataTable.getMetadata().put(DataTable.TRACE_INFO_METADATA_KEY,
TraceContext.getTraceInfo());
+ dataTable.getMetadata().put(MetadataKey.TRACE_INFO.getName(),
TraceContext.getTraceInfo());
}
TraceContext.unregister();
}
@@ -240,14 +241,14 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
queryProcessingTimer.stopAndRecord();
long queryProcessingTime = queryProcessingTimer.getDurationMs();
- dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED,
Integer.toString(numSegmentsQueried));
- dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY,
Long.toString(queryProcessingTime));
+ dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_QUERIED.getName(),
Integer.toString(numSegmentsQueried));
+ dataTable.getMetadata().put(MetadataKey.TIME_USED_MS.getName(),
Long.toString(queryProcessingTime));
if (numConsumingSegmentsProcessed > 0) {
dataTable.getMetadata()
- .put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED,
Integer.toString(numConsumingSegmentsProcessed));
+ .put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
Integer.toString(numConsumingSegmentsProcessed));
dataTable.getMetadata()
- .put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
Long.toString(minConsumingFreshnessTimeMs));
+ .put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
Long.toString(minConsumingFreshnessTimeMs));
}
LOGGER.debug("Query processing time for request Id - {}: {}", requestId,
queryProcessingTime);
@@ -277,12 +278,12 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
DataTable dataTable =
enableStreaming ? DataTableBuilder.getEmptyDataTable() :
DataTableUtils.buildEmptyDataTable(queryContext);
Map<String, String> metadata = dataTable.getMetadata();
- metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY,
String.valueOf(numTotalDocs));
- metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
- metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
- metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
"0");
- metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
- metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+ metadata.put(MetadataKey.TOTAL_DOCS.getName(),
String.valueOf(numTotalDocs));
+ metadata.put(MetadataKey.NUM_DOCS_SCANNED.getName(), "0");
+ metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), "0");
+ metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), "0");
+ metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0");
+ metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0");
return dataTable;
} else {
TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
@@ -296,7 +297,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
planExecTimer.stopAndRecord();
// Update the total docs in the metadata based on the un-pruned segments
- dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY,
Long.toString(numTotalDocs));
+ dataTable.getMetadata().put(MetadataKey.TOTAL_DOCS.getName(),
Long.toString(numTotalDocs));
return dataTable;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 1d5f132..d63b8f8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -39,6 +39,7 @@ import
org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import org.apache.pinot.core.transport.ServerRoutingInstance;
@@ -145,7 +146,7 @@ public class BrokerReduceService {
// Reduce on trace info.
if (brokerRequest.isEnableTrace()) {
brokerResponseNative.getTraceInfo()
- .put(entry.getKey().getHostname(),
metadata.get(DataTable.TRACE_INFO_METADATA_KEY));
+ .put(entry.getKey().getHostname(),
metadata.get(MetadataKey.TRACE_INFO.getName()));
}
// Reduce on exceptions.
@@ -155,44 +156,44 @@ public class BrokerReduceService {
}
// Reduce on execution statistics.
- String numDocsScannedString =
metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY);
+ String numDocsScannedString =
metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName());
if (numDocsScannedString != null) {
numDocsScanned += Long.parseLong(numDocsScannedString);
}
- String numEntriesScannedInFilterString =
metadata.get(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY);
+ String numEntriesScannedInFilterString =
metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
if (numEntriesScannedInFilterString != null) {
numEntriesScannedInFilter +=
Long.parseLong(numEntriesScannedInFilterString);
}
- String numEntriesScannedPostFilterString =
metadata.get(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY);
+ String numEntriesScannedPostFilterString =
metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
if (numEntriesScannedPostFilterString != null) {
numEntriesScannedPostFilter +=
Long.parseLong(numEntriesScannedPostFilterString);
}
- String numSegmentsQueriedString =
metadata.get(DataTable.NUM_SEGMENTS_QUERIED);
+ String numSegmentsQueriedString =
metadata.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName());
if (numSegmentsQueriedString != null) {
numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
}
- String numSegmentsProcessedString =
metadata.get(DataTable.NUM_SEGMENTS_PROCESSED);
+ String numSegmentsProcessedString =
metadata.get(MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
if (numSegmentsProcessedString != null) {
numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
}
- String numSegmentsMatchedString =
metadata.get(DataTable.NUM_SEGMENTS_MATCHED);
+ String numSegmentsMatchedString =
metadata.get(MetadataKey.NUM_SEGMENTS_MATCHED.getName());
if (numSegmentsMatchedString != null) {
numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
}
- String numConsumingString =
metadata.get(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED);
+ String numConsumingString =
metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
if (numConsumingString != null) {
numConsumingSegmentsProcessed += Long.parseLong(numConsumingString);
}
- String minConsumingFreshnessTimeMsString =
metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+ String minConsumingFreshnessTimeMsString =
metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
if (minConsumingFreshnessTimeMsString != null) {
minConsumingFreshnessTimeMs =
Math.min(Long.parseLong(minConsumingFreshnessTimeMsString),
minConsumingFreshnessTimeMs);
}
- String threadCpuTimeNsString =
metadata.get(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName());
+ String threadCpuTimeNsString =
metadata.get(MetadataKey.THREAD_CPU_TIME_NS.getName());
if (threadCpuTimeNsString != null) {
if (entry.getKey().getTableType() == TableType.OFFLINE) {
offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
@@ -201,11 +202,11 @@ public class BrokerReduceService {
}
}
- String numTotalDocsString =
metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
+ String numTotalDocsString =
metadata.get(MetadataKey.TOTAL_DOCS.getName());
if (numTotalDocsString != null) {
numTotalDocs += Long.parseLong(numTotalDocsString);
}
- numGroupsLimitReached |=
Boolean.parseBoolean(metadata.get(DataTable.NUM_GROUPS_LIMIT_REACHED_KEY));
+ numGroupsLimitReached |=
Boolean.parseBoolean(metadata.get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
// After processing the metadata, remove data tables without data rows
inside.
DataSchema dataSchema = dataTable.getDataSchema();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index dd65945..8411ac7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -163,30 +163,30 @@ public abstract class QueryScheduler {
}
long requestId = queryRequest.getRequestId();
Map<String, String> dataTableMetadata = dataTable.getMetadata();
- dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
+ dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = serializeDataTable(queryRequest, dataTable);
// Log the statistics
String tableNameWithType = queryRequest.getTableNameWithType();
long numDocsScanned =
-
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_DOCS_SCANNED_METADATA_KEY,
INVALID_NUM_SCANNED));
+
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(),
INVALID_NUM_SCANNED));
long numEntriesScannedInFilter = Long.parseLong(
-
dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY,
INVALID_NUM_SCANNED));
+
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
INVALID_NUM_SCANNED));
long numEntriesScannedPostFilter = Long.parseLong(
-
dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
INVALID_NUM_SCANNED));
- long numSegmentsProcessed =
-
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED,
INVALID_SEGMENTS_COUNT));
- long numSegmentsMatched =
-
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED,
INVALID_SEGMENTS_COUNT));
+
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
INVALID_NUM_SCANNED));
+ long numSegmentsProcessed = Long.parseLong(
+
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(),
INVALID_SEGMENTS_COUNT));
+ long numSegmentsMatched = Long.parseLong(
+
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(),
INVALID_SEGMENTS_COUNT));
long numSegmentsConsuming = Long.parseLong(
-
dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED,
INVALID_SEGMENTS_COUNT));
- long minConsumingFreshnessMs =
-
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
INVALID_FRESHNESS_MS));
+
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
INVALID_SEGMENTS_COUNT));
+ long minConsumingFreshnessMs = Long.parseLong(
+
dataTableMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
INVALID_FRESHNESS_MS));
int numResizes =
-
Integer.parseInt(dataTableMetadata.getOrDefault(DataTable.NUM_RESIZES_METADATA_KEY,
INVALID_NUM_RESIZES));
+
Integer.parseInt(dataTableMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(),
INVALID_NUM_RESIZES));
long resizeTimeMs =
-
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.RESIZE_TIME_MS_METADATA_KEY,
INVALID_RESIZE_TIME_MS));
+
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(),
INVALID_RESIZE_TIME_MS));
long threadCpuTimeNs =
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(),
"0"));
@@ -311,7 +311,7 @@ public abstract class QueryScheduler {
DataTable result = DataTableBuilder.getEmptyDataTable();
Map<String, String> dataTableMetadata = result.getMetadata();
- dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(queryRequest.getRequestId()));
+ dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(queryRequest.getRequestId()));
result.addException(error);
return Futures.immediateFuture(serializeDataTable(queryRequest, result));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 2ae6567..f0837f0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
@@ -153,7 +154,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
DataTable dataTable, Exception e) {
try {
Map<String, String> dataTableMetadata = dataTable.getMetadata();
- dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
+ dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
byte[] serializedDataTable = dataTable.toBytes();
sendResponse(ctx, queryArrivalTimeMs, serializedDataTable);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 1c550e3..2e77b82 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.spi.config.table.TableType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,8 +75,8 @@ public class QueryRouter {
_brokerId = brokerId;
_brokerMetrics = brokerMetrics;
_serverChannels = new ServerChannels(this, brokerMetrics);
- _serverChannelsTls = Optional.ofNullable(tlsConfig)
- .map(conf -> new ServerChannels(this, brokerMetrics,
conf)).orElse(null);
+ _serverChannelsTls =
+ Optional.ofNullable(tlsConfig).map(conf -> new ServerChannels(this,
brokerMetrics, conf)).orElse(null);
}
public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
@@ -92,7 +93,8 @@ public class QueryRouter {
if (offlineBrokerRequest != null) {
assert offlineRoutingTable != null;
for (Map.Entry<ServerInstance, List<String>> entry :
offlineRoutingTable.entrySet()) {
- ServerRoutingInstance serverRoutingInstance =
entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls);
+ ServerRoutingInstance serverRoutingInstance =
+ entry.getKey().toServerRoutingInstance(TableType.OFFLINE,
preferTls);
InstanceRequest instanceRequest = getInstanceRequest(requestId,
offlineBrokerRequest, entry.getValue());
requestMap.put(serverRoutingInstance, instanceRequest);
}
@@ -100,7 +102,8 @@ public class QueryRouter {
if (realtimeBrokerRequest != null) {
assert realtimeRoutingTable != null;
for (Map.Entry<ServerInstance, List<String>> entry :
realtimeRoutingTable.entrySet()) {
- ServerRoutingInstance serverRoutingInstance =
entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls);
+ ServerRoutingInstance serverRoutingInstance =
+ entry.getKey().toServerRoutingInstance(TableType.REALTIME,
preferTls);
InstanceRequest instanceRequest = getInstanceRequest(requestId,
realtimeBrokerRequest, entry.getValue());
requestMap.put(serverRoutingInstance, instanceRequest);
}
@@ -135,7 +138,7 @@ public class QueryRouter {
void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable
dataTable, int responseSize,
int deserializationTimeMs) {
- long requestId =
Long.parseLong(dataTable.getMetadata().get(DataTable.REQUEST_ID_METADATA_KEY));
+ long requestId =
Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName()));
AsyncQueryResponse asyncQueryResponse =
_asyncQueryResponseMap.get(requestId);
// Query future might be null if the query is already done (maybe due to
failure)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index c92cc73..f22e660 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.pql.parsers.Pql2Compiler;
@@ -80,7 +81,7 @@ public class QueryRoutingTest {
throws Exception {
long requestId = 123;
DataTable dataTable = DataTableBuilder.getEmptyDataTable();
- dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
+ dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
// Start the server
@@ -157,7 +158,7 @@ public class QueryRoutingTest {
throws Exception {
long requestId = 123;
DataTable dataTable = DataTableBuilder.getEmptyDataTable();
- dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
+ dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
// Start the server
@@ -187,7 +188,7 @@ public class QueryRoutingTest {
throws Exception {
long requestId = 123;
DataTable dataTable = DataTableBuilder.getEmptyDataTable();
- dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY,
Long.toString(requestId));
+ dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
// Start the server
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 23dc1b9..993bd47 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -40,6 +40,7 @@ import
org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
@@ -1543,7 +1544,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertNotNull(dataTable.getDataSchema());
assertEquals(dataTable.getNumberOfRows(), expectedNumDocs);
Map<String, String> metadata = dataTable.getMetadata();
- assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY),
Integer.toString(expectedNumDocs));
+ assertEquals(metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName()),
Integer.toString(expectedNumDocs));
}
private void testStreamingRequest(Iterator<Server.ServerResponse>
streamingResponses)
@@ -1558,7 +1559,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
if
(responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) {
// verify the returned data table metadata only contains
"threadCpuTimeNs".
Map<String, String> metadata = dataTable.getMetadata();
- assertTrue(metadata.size() == 1 &&
metadata.containsKey(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName()));
+ assertTrue(metadata.size() == 1 &&
metadata.containsKey(MetadataKey.THREAD_CPU_TIME_NS.getName()));
assertNotNull(dataTable.getDataSchema());
numTotalDocs += dataTable.getNumberOfRows();
} else {
@@ -1568,7 +1569,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertNull(dataTable.getDataSchema());
assertEquals(dataTable.getNumberOfRows(), 0);
Map<String, String> metadata = dataTable.getMetadata();
- assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY),
Integer.toString(expectedNumDocs));
+ assertEquals(metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName()),
Integer.toString(expectedNumDocs));
}
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index e21bb2d..b40e43e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -30,7 +30,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -187,8 +187,8 @@ public class
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
JsonNode responseToCompare = postQuery(queryToCompare);
// Should only query the segments for partition 0
- assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 2);
-
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
+
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
2);
+
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
4);
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
responseToCompare.get("aggregationResults").get(0).get("value").asInt());
@@ -203,8 +203,8 @@ public class
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
JsonNode responseToCompare = postQuery(queryToCompare);
// Should only query the segments for partition 1
- assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 2);
-
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
+
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
2);
+
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
4);
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
responseToCompare.get("aggregationResults").get(0).get("value").asInt());
@@ -271,8 +271,8 @@ public class
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
JsonNode responseToCompare = postQuery(queryToCompare);
// Should skip the first completed segments and the consuming segment
for partition 1
- assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(),
numSegments - 2);
-
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(),
numSegments);
+
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
numSegments - 2);
+
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
numSegments);
// The result won't match because the consuming segment for partition 1
is pruned out
}
@@ -286,8 +286,8 @@ public class
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
JsonNode responseToCompare = postQuery(queryToCompare);
// Should skip the first completed segments and the consuming segment
for partition 0
- assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(),
numSegments - 2);
-
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(),
numSegments);
+
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
numSegments - 2);
+
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
numSegments);
// The result won't match because the consuming segment for partition 0
is pruned out
}
@@ -350,8 +350,8 @@ public class
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
JsonNode responseToCompare = postQuery(queryToCompare);
// Should skip 2 completed segments and the consuming segment for
partition 1
- assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(),
numSegments - 3);
-
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(),
numSegments);
+
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
numSegments - 3);
+
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
numSegments);
// The result should match again after all the segments with the
non-partitioning records are committed
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
@@ -367,8 +367,8 @@ public class
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
JsonNode responseToCompare = postQuery(queryToCompare);
// Should skip 2 completed segments and the consuming segment for
partition 0
- assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(),
numSegments - 3);
-
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(),
numSegments);
+
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
numSegments - 3);
+
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
numSegments);
// The result should match again after all the segments with the
non-partitioning records are committed
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]