This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 95bda21  Remove all V2 metadata string keys, use V3 metadata enum keys 
instead (#6742)
95bda21 is described below

commit 95bda214b81544e56cc5f313b034d667333c90bc
Author: Liang Mingqiang <[email protected]>
AuthorDate: Fri Apr 16 12:32:06 2021 -0700

    Remove all V2 metadata string keys, use V3 metadata enum keys instead 
(#6742)
    
    Remove all V2 metadata string keys (except the "Exception" one, which is 
used in V2 to construct exception metadata keys), use V3 metadata enum keys 
instead.
---
 .../org/apache/pinot/common/utils/DataTable.java   | 16 +------------
 .../operator/blocks/IntermediateResultsBlock.java  | 19 ++++++++-------
 .../query/executor/ServerQueryExecutorV1Impl.java  | 25 +++++++++----------
 .../core/query/reduce/BrokerReduceService.java     | 25 +++++++++----------
 .../pinot/core/query/scheduler/QueryScheduler.java | 28 +++++++++++-----------
 .../core/transport/InstanceRequestHandler.java     |  3 ++-
 .../apache/pinot/core/transport/QueryRouter.java   | 13 ++++++----
 .../pinot/core/transport/QueryRoutingTest.java     |  7 +++---
 .../tests/OfflineClusterIntegrationTest.java       |  7 +++---
 ...PartitionLLCRealtimeClusterIntegrationTest.java | 26 ++++++++++----------
 10 files changed, 82 insertions(+), 87 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index b699b02..0e62948 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -30,22 +30,8 @@ import org.apache.pinot.spi.utils.ByteArray;
  * Data table is used to transfer data from server to broker.
  */
 public interface DataTable {
+  // TODO: remove this when we stop supporting DataTable V2.
   String EXCEPTION_METADATA_KEY = "Exception";
-  String NUM_DOCS_SCANNED_METADATA_KEY = "numDocsScanned";
-  String NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY = 
"numEntriesScannedInFilter";
-  String NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY = 
"numEntriesScannedPostFilter";
-  String NUM_SEGMENTS_QUERIED = "numSegmentsQueried";
-  String NUM_SEGMENTS_PROCESSED = "numSegmentsProcessed";
-  String NUM_SEGMENTS_MATCHED = "numSegmentsMatched";
-  String NUM_CONSUMING_SEGMENTS_PROCESSED = "numConsumingSegmentsProcessed";
-  String MIN_CONSUMING_FRESHNESS_TIME_MS = "minConsumingFreshnessTimeMs";
-  String TOTAL_DOCS_METADATA_KEY = "totalDocs";
-  String NUM_GROUPS_LIMIT_REACHED_KEY = "numGroupsLimitReached";
-  String TIME_USED_MS_METADATA_KEY = "timeUsedMs";
-  String TRACE_INFO_METADATA_KEY = "traceInfo";
-  String REQUEST_ID_METADATA_KEY = "requestId";
-  String NUM_RESIZES_METADATA_KEY = "numResizes";
-  String RESIZE_TIME_MS_METADATA_KEY = "resizeTimeMs";
 
   void addException(ProcessingException processingException);
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index 3886e27..c4f245e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
@@ -426,19 +427,19 @@ public class IntermediateResultsBlock implements Block {
   }
 
   private DataTable attachMetadataToDataTable(DataTable dataTable) {
-    dataTable.getMetadata().put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, 
String.valueOf(_numDocsScanned));
+    dataTable.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), 
String.valueOf(_numDocsScanned));
     dataTable.getMetadata()
-        .put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, 
String.valueOf(_numEntriesScannedInFilter));
+        .put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), 
String.valueOf(_numEntriesScannedInFilter));
     dataTable.getMetadata()
-        .put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, 
String.valueOf(_numEntriesScannedPostFilter));
-    dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_PROCESSED, 
String.valueOf(_numSegmentsProcessed));
-    dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_MATCHED, 
String.valueOf(_numSegmentsMatched));
-    dataTable.getMetadata().put(DataTable.NUM_RESIZES_METADATA_KEY, 
String.valueOf(_numResizes));
-    dataTable.getMetadata().put(DataTable.RESIZE_TIME_MS_METADATA_KEY, 
String.valueOf(_resizeTimeMs));
+        .put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), 
String.valueOf(_numEntriesScannedPostFilter));
+    dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), 
String.valueOf(_numSegmentsProcessed));
+    dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), 
String.valueOf(_numSegmentsMatched));
+    dataTable.getMetadata().put(MetadataKey.NUM_RESIZES.getName(), 
String.valueOf(_numResizes));
+    dataTable.getMetadata().put(MetadataKey.RESIZE_TIME_MS.getName(), 
String.valueOf(_resizeTimeMs));
 
-    dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, 
String.valueOf(_numTotalDocs));
+    dataTable.getMetadata().put(MetadataKey.TOTAL_DOCS.getName(), 
String.valueOf(_numTotalDocs));
     if (_numGroupsLimitReached) {
-      dataTable.getMetadata().put(DataTable.NUM_GROUPS_LIMIT_REACHED_KEY, 
"true");
+      
dataTable.getMetadata().put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), 
"true");
     }
     if (_processingExceptions != null && _processingExceptions.size() > 0) {
       for (ProcessingException exception : _processingExceptions) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 1718456..9a8ad4d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -232,7 +233,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       }
       if (enableTrace) {
         if (dataTable != null) {
-          dataTable.getMetadata().put(DataTable.TRACE_INFO_METADATA_KEY, 
TraceContext.getTraceInfo());
+          dataTable.getMetadata().put(MetadataKey.TRACE_INFO.getName(), 
TraceContext.getTraceInfo());
         }
         TraceContext.unregister();
       }
@@ -240,14 +241,14 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
 
     queryProcessingTimer.stopAndRecord();
     long queryProcessingTime = queryProcessingTimer.getDurationMs();
-    dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, 
Integer.toString(numSegmentsQueried));
-    dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, 
Long.toString(queryProcessingTime));
+    dataTable.getMetadata().put(MetadataKey.NUM_SEGMENTS_QUERIED.getName(), 
Integer.toString(numSegmentsQueried));
+    dataTable.getMetadata().put(MetadataKey.TIME_USED_MS.getName(), 
Long.toString(queryProcessingTime));
 
     if (numConsumingSegmentsProcessed > 0) {
       dataTable.getMetadata()
-          .put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED, 
Integer.toString(numConsumingSegmentsProcessed));
+          .put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), 
Integer.toString(numConsumingSegmentsProcessed));
       dataTable.getMetadata()
-          .put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, 
Long.toString(minConsumingFreshnessTimeMs));
+          .put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
Long.toString(minConsumingFreshnessTimeMs));
     }
 
     LOGGER.debug("Query processing time for request Id - {}: {}", requestId, 
queryProcessingTime);
@@ -277,12 +278,12 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       DataTable dataTable =
           enableStreaming ? DataTableBuilder.getEmptyDataTable() : 
DataTableUtils.buildEmptyDataTable(queryContext);
       Map<String, String> metadata = dataTable.getMetadata();
-      metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, 
String.valueOf(numTotalDocs));
-      metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
-      metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
-      metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, 
"0");
-      metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
-      metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+      metadata.put(MetadataKey.TOTAL_DOCS.getName(), 
String.valueOf(numTotalDocs));
+      metadata.put(MetadataKey.NUM_DOCS_SCANNED.getName(), "0");
+      metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), "0");
+      metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), "0");
+      metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0");
+      metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0");
       return dataTable;
     } else {
       TimerContext.Timer planBuildTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
@@ -296,7 +297,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       planExecTimer.stopAndRecord();
 
       // Update the total docs in the metadata based on the un-pruned segments
-      dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, 
Long.toString(numTotalDocs));
+      dataTable.getMetadata().put(MetadataKey.TOTAL_DOCS.getName(), 
Long.toString(numTotalDocs));
 
       return dataTable;
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 1d5f132..d63b8f8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -39,6 +39,7 @@ import 
org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
@@ -145,7 +146,7 @@ public class BrokerReduceService {
       // Reduce on trace info.
       if (brokerRequest.isEnableTrace()) {
         brokerResponseNative.getTraceInfo()
-            .put(entry.getKey().getHostname(), 
metadata.get(DataTable.TRACE_INFO_METADATA_KEY));
+            .put(entry.getKey().getHostname(), 
metadata.get(MetadataKey.TRACE_INFO.getName()));
       }
 
       // Reduce on exceptions.
@@ -155,44 +156,44 @@ public class BrokerReduceService {
       }
 
       // Reduce on execution statistics.
-      String numDocsScannedString = 
metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY);
+      String numDocsScannedString = 
metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName());
       if (numDocsScannedString != null) {
         numDocsScanned += Long.parseLong(numDocsScannedString);
       }
-      String numEntriesScannedInFilterString = 
metadata.get(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY);
+      String numEntriesScannedInFilterString = 
metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName());
       if (numEntriesScannedInFilterString != null) {
         numEntriesScannedInFilter += 
Long.parseLong(numEntriesScannedInFilterString);
       }
-      String numEntriesScannedPostFilterString = 
metadata.get(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY);
+      String numEntriesScannedPostFilterString = 
metadata.get(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName());
       if (numEntriesScannedPostFilterString != null) {
         numEntriesScannedPostFilter += 
Long.parseLong(numEntriesScannedPostFilterString);
       }
-      String numSegmentsQueriedString = 
metadata.get(DataTable.NUM_SEGMENTS_QUERIED);
+      String numSegmentsQueriedString = 
metadata.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName());
       if (numSegmentsQueriedString != null) {
         numSegmentsQueried += Long.parseLong(numSegmentsQueriedString);
       }
 
-      String numSegmentsProcessedString = 
metadata.get(DataTable.NUM_SEGMENTS_PROCESSED);
+      String numSegmentsProcessedString = 
metadata.get(MetadataKey.NUM_SEGMENTS_PROCESSED.getName());
       if (numSegmentsProcessedString != null) {
         numSegmentsProcessed += Long.parseLong(numSegmentsProcessedString);
       }
-      String numSegmentsMatchedString = 
metadata.get(DataTable.NUM_SEGMENTS_MATCHED);
+      String numSegmentsMatchedString = 
metadata.get(MetadataKey.NUM_SEGMENTS_MATCHED.getName());
       if (numSegmentsMatchedString != null) {
         numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
       }
 
-      String numConsumingString = 
metadata.get(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED);
+      String numConsumingString = 
metadata.get(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName());
       if (numConsumingString != null) {
         numConsumingSegmentsProcessed += Long.parseLong(numConsumingString);
       }
 
-      String minConsumingFreshnessTimeMsString = 
metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+      String minConsumingFreshnessTimeMsString = 
metadata.get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName());
       if (minConsumingFreshnessTimeMsString != null) {
         minConsumingFreshnessTimeMs =
             Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), 
minConsumingFreshnessTimeMs);
       }
 
-      String threadCpuTimeNsString = 
metadata.get(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName());
+      String threadCpuTimeNsString = 
metadata.get(MetadataKey.THREAD_CPU_TIME_NS.getName());
       if (threadCpuTimeNsString != null) {
         if (entry.getKey().getTableType() == TableType.OFFLINE) {
           offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
@@ -201,11 +202,11 @@ public class BrokerReduceService {
         }
       }
 
-      String numTotalDocsString = 
metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
+      String numTotalDocsString = 
metadata.get(MetadataKey.TOTAL_DOCS.getName());
       if (numTotalDocsString != null) {
         numTotalDocs += Long.parseLong(numTotalDocsString);
       }
-      numGroupsLimitReached |= 
Boolean.parseBoolean(metadata.get(DataTable.NUM_GROUPS_LIMIT_REACHED_KEY));
+      numGroupsLimitReached |= 
Boolean.parseBoolean(metadata.get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
 
       // After processing the metadata, remove data tables without data rows 
inside.
       DataSchema dataSchema = dataTable.getDataSchema();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index dd65945..8411ac7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -163,30 +163,30 @@ public abstract class QueryScheduler {
     }
     long requestId = queryRequest.getRequestId();
     Map<String, String> dataTableMetadata = dataTable.getMetadata();
-    dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, 
Long.toString(requestId));
+    dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
 
     byte[] responseBytes = serializeDataTable(queryRequest, dataTable);
 
     // Log the statistics
     String tableNameWithType = queryRequest.getTableNameWithType();
     long numDocsScanned =
-        
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_DOCS_SCANNED_METADATA_KEY,
 INVALID_NUM_SCANNED));
+        
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(),
 INVALID_NUM_SCANNED));
     long numEntriesScannedInFilter = Long.parseLong(
-        
dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY,
 INVALID_NUM_SCANNED));
+        
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
 INVALID_NUM_SCANNED));
     long numEntriesScannedPostFilter = Long.parseLong(
-        
dataTableMetadata.getOrDefault(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY,
 INVALID_NUM_SCANNED));
-    long numSegmentsProcessed =
-        
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, 
INVALID_SEGMENTS_COUNT));
-    long numSegmentsMatched =
-        
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, 
INVALID_SEGMENTS_COUNT));
+        
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
 INVALID_NUM_SCANNED));
+    long numSegmentsProcessed = Long.parseLong(
+        
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), 
INVALID_SEGMENTS_COUNT));
+    long numSegmentsMatched = Long.parseLong(
+        
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), 
INVALID_SEGMENTS_COUNT));
     long numSegmentsConsuming = Long.parseLong(
-        
dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED, 
INVALID_SEGMENTS_COUNT));
-    long minConsumingFreshnessMs =
-        
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
 INVALID_FRESHNESS_MS));
+        
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
 INVALID_SEGMENTS_COUNT));
+    long minConsumingFreshnessMs = Long.parseLong(
+        
dataTableMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
 INVALID_FRESHNESS_MS));
     int numResizes =
-        
Integer.parseInt(dataTableMetadata.getOrDefault(DataTable.NUM_RESIZES_METADATA_KEY,
 INVALID_NUM_RESIZES));
+        
Integer.parseInt(dataTableMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(),
 INVALID_NUM_RESIZES));
     long resizeTimeMs =
-        
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.RESIZE_TIME_MS_METADATA_KEY,
 INVALID_RESIZE_TIME_MS));
+        
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(),
 INVALID_RESIZE_TIME_MS));
     long threadCpuTimeNs =
         
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(),
 "0"));
 
@@ -311,7 +311,7 @@ public abstract class QueryScheduler {
     DataTable result = DataTableBuilder.getEmptyDataTable();
 
     Map<String, String> dataTableMetadata = result.getMetadata();
-    dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, 
Long.toString(queryRequest.getRequestId()));
+    dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(queryRequest.getRequestId()));
 
     result.addException(error);
     return Futures.immediateFuture(serializeDataTable(queryRequest, result));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 2ae6567..f0837f0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
@@ -153,7 +154,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
       DataTable dataTable, Exception e) {
     try {
       Map<String, String> dataTableMetadata = dataTable.getMetadata();
-      dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, 
Long.toString(requestId));
+      dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
       
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
       byte[] serializedDataTable = dataTable.toBytes();
       sendResponse(ctx, queryArrivalTimeMs, serializedDataTable);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 1c550e3..2e77b82 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.spi.config.table.TableType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,8 +75,8 @@ public class QueryRouter {
     _brokerId = brokerId;
     _brokerMetrics = brokerMetrics;
     _serverChannels = new ServerChannels(this, brokerMetrics);
-    _serverChannelsTls = Optional.ofNullable(tlsConfig)
-        .map(conf -> new ServerChannels(this, brokerMetrics, 
conf)).orElse(null);
+    _serverChannelsTls =
+        Optional.ofNullable(tlsConfig).map(conf -> new ServerChannels(this, 
brokerMetrics, conf)).orElse(null);
   }
 
   public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
@@ -92,7 +93,8 @@ public class QueryRouter {
     if (offlineBrokerRequest != null) {
       assert offlineRoutingTable != null;
       for (Map.Entry<ServerInstance, List<String>> entry : 
offlineRoutingTable.entrySet()) {
-        ServerRoutingInstance serverRoutingInstance = 
entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls);
+        ServerRoutingInstance serverRoutingInstance =
+            entry.getKey().toServerRoutingInstance(TableType.OFFLINE, 
preferTls);
         InstanceRequest instanceRequest = getInstanceRequest(requestId, 
offlineBrokerRequest, entry.getValue());
         requestMap.put(serverRoutingInstance, instanceRequest);
       }
@@ -100,7 +102,8 @@ public class QueryRouter {
     if (realtimeBrokerRequest != null) {
       assert realtimeRoutingTable != null;
       for (Map.Entry<ServerInstance, List<String>> entry : 
realtimeRoutingTable.entrySet()) {
-        ServerRoutingInstance serverRoutingInstance = 
entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls);
+        ServerRoutingInstance serverRoutingInstance =
+            entry.getKey().toServerRoutingInstance(TableType.REALTIME, 
preferTls);
         InstanceRequest instanceRequest = getInstanceRequest(requestId, 
realtimeBrokerRequest, entry.getValue());
         requestMap.put(serverRoutingInstance, instanceRequest);
       }
@@ -135,7 +138,7 @@ public class QueryRouter {
 
   void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable 
dataTable, int responseSize,
       int deserializationTimeMs) {
-    long requestId = 
Long.parseLong(dataTable.getMetadata().get(DataTable.REQUEST_ID_METADATA_KEY));
+    long requestId = 
Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName()));
     AsyncQueryResponse asyncQueryResponse = 
_asyncQueryResponseMap.get(requestId);
 
     // Query future might be null if the query is already done (maybe due to 
failure)
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index c92cc73..f22e660 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
@@ -80,7 +81,7 @@ public class QueryRoutingTest {
       throws Exception {
     long requestId = 123;
     DataTable dataTable = DataTableBuilder.getEmptyDataTable();
-    dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY, 
Long.toString(requestId));
+    dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
@@ -157,7 +158,7 @@ public class QueryRoutingTest {
       throws Exception {
     long requestId = 123;
     DataTable dataTable = DataTableBuilder.getEmptyDataTable();
-    dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY, 
Long.toString(requestId));
+    dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
@@ -187,7 +188,7 @@ public class QueryRoutingTest {
       throws Exception {
     long requestId = 123;
     DataTable dataTable = DataTableBuilder.getEmptyDataTable();
-    dataTable.getMetadata().put(DataTable.REQUEST_ID_METADATA_KEY, 
Long.toString(requestId));
+    dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 23dc1b9..993bd47 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -40,6 +40,7 @@ import 
org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
 import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
@@ -1543,7 +1544,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertNotNull(dataTable.getDataSchema());
     assertEquals(dataTable.getNumberOfRows(), expectedNumDocs);
     Map<String, String> metadata = dataTable.getMetadata();
-    assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY), 
Integer.toString(expectedNumDocs));
+    assertEquals(metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName()), 
Integer.toString(expectedNumDocs));
   }
 
   private void testStreamingRequest(Iterator<Server.ServerResponse> 
streamingResponses)
@@ -1558,7 +1559,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       if 
(responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) {
         // verify the returned data table metadata only contains 
"threadCpuTimeNs".
         Map<String, String> metadata = dataTable.getMetadata();
-        assertTrue(metadata.size() == 1 && 
metadata.containsKey(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName()));
+        assertTrue(metadata.size() == 1 && 
metadata.containsKey(MetadataKey.THREAD_CPU_TIME_NS.getName()));
         assertNotNull(dataTable.getDataSchema());
         numTotalDocs += dataTable.getNumberOfRows();
       } else {
@@ -1568,7 +1569,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         assertNull(dataTable.getDataSchema());
         assertEquals(dataTable.getNumberOfRows(), 0);
         Map<String, String> metadata = dataTable.getMetadata();
-        assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY), 
Integer.toString(expectedNumDocs));
+        assertEquals(metadata.get(MetadataKey.NUM_DOCS_SCANNED.getName()), 
Integer.toString(expectedNumDocs));
       }
     }
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index e21bb2d..b40e43e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -30,7 +30,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -187,8 +187,8 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       JsonNode responseToCompare = postQuery(queryToCompare);
 
       // Should only query the segments for partition 0
-      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 2);
-      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
+      
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 
2);
+      
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
 4);
 
       
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
           
responseToCompare.get("aggregationResults").get(0).get("value").asInt());
@@ -203,8 +203,8 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       JsonNode responseToCompare = postQuery(queryToCompare);
 
       // Should only query the segments for partition 1
-      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 2);
-      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 4);
+      
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 
2);
+      
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
 4);
 
       
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
           
responseToCompare.get("aggregationResults").get(0).get("value").asInt());
@@ -271,8 +271,8 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       JsonNode responseToCompare = postQuery(queryToCompare);
 
       // Should skip the first completed segments and the consuming segment 
for partition 1
-      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments - 2);
-      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments);
+      
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 
numSegments - 2);
+      
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
 numSegments);
 
       // The result won't match because the consuming segment for partition 1 
is pruned out
     }
@@ -286,8 +286,8 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       JsonNode responseToCompare = postQuery(queryToCompare);
 
       // Should skip the first completed segments and the consuming segment 
for partition 0
-      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments - 2);
-      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments);
+      
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 
numSegments - 2);
+      
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
 numSegments);
 
       // The result won't match because the consuming segment for partition 0 
is pruned out
     }
@@ -350,8 +350,8 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       JsonNode responseToCompare = postQuery(queryToCompare);
 
       // Should skip 2 completed segments and the consuming segment for 
partition 1
-      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments - 3);
-      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments);
+      
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 
numSegments - 3);
+      
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
 numSegments);
 
       // The result should match again after all the segments with the 
non-partitioning records are committed
       
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),
@@ -367,8 +367,8 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       JsonNode responseToCompare = postQuery(queryToCompare);
 
       // Should skip 2 completed segments and the consuming segment for 
partition 0
-      assertEquals(response.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments - 3);
-      
assertEquals(responseToCompare.get(DataTable.NUM_SEGMENTS_QUERIED).asInt(), 
numSegments);
+      
assertEquals(response.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(), 
numSegments - 3);
+      
assertEquals(responseToCompare.get(MetadataKey.NUM_SEGMENTS_QUERIED.getName()).asInt(),
 numSegments);
 
       // The result should match again after all the segments with the 
non-partitioning records are committed
       
assertEquals(response.get("aggregationResults").get(0).get("value").asInt(),

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to