This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3caf9ab0e6 [multistage] fix usage of metadata overrides (#11587)
3caf9ab0e6 is described below
commit 3caf9ab0e64c91b1e7100400c54905639a2aef21
Author: Rong Rong <[email protected]>
AuthorDate: Wed Sep 13 15:06:36 2023 -0700
[multistage] fix usage of metadata overrides (#11587)
---
.../apache/pinot/query/runtime/QueryRunner.java | 35 ++++++++++++++--------
.../query/runtime/operator/AggregateOperator.java | 7 ++---
.../query/runtime/operator/HashJoinOperator.java | 18 +++++------
.../LeafStageTransferableBlockOperator.java | 2 +-
.../operator/MultistageGroupByExecutor.java | 6 ++--
.../runtime/plan/OpChainExecutionContext.java | 13 ++++----
.../pinot/query/runtime/plan/StageMetadata.java | 3 +-
.../plan/pipeline/PipelineBreakerExecutor.java | 6 ++--
.../plan/server/ServerPlanRequestUtils.java | 4 +--
.../pinot/query/service/server/QueryServer.java | 3 +-
10 files changed, 51 insertions(+), 46 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 7a9abc1d45..1e27fb559c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -152,14 +153,14 @@ public class QueryRunner {
public void processQuery(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadata) {
long requestId =
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
long timeoutMs =
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+ Map<String, String> opChainMetadata = consolidateMetadata(
+ distributedStagePlan.getStageMetadata().getCustomProperties(),
requestMetadata);
long deadlineMs = System.currentTimeMillis() + timeoutMs;
-
setStageCustomProperties(distributedStagePlan.getStageMetadata().getCustomProperties(),
requestMetadata);
-
// run pre-stage execution for all pipeline breakers
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler,
_mailboxService, distributedStagePlan,
- requestMetadata, requestId, deadlineMs);
+ opChainMetadata, requestId, deadlineMs);
// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock()
!= null) {
@@ -188,7 +189,7 @@ public class QueryRunner {
// run OpChain
OpChainExecutionContext executionContext =
new OpChainExecutionContext(_mailboxService, requestId,
distributedStagePlan.getStageId(),
- distributedStagePlan.getServer(), deadlineMs, requestMetadata,
distributedStagePlan.getStageMetadata(),
+ distributedStagePlan.getServer(), deadlineMs, opChainMetadata,
distributedStagePlan.getStageMetadata(),
pipelineBreakerResult);
OpChain opChain;
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
@@ -199,39 +200,47 @@ public class QueryRunner {
_opChainScheduler.register(opChain);
}
- private void setStageCustomProperties(Map<String, String> customProperties,
Map<String, String> requestMetadata) {
- Integer numGroupsLimit =
QueryOptionsUtils.getNumGroupsLimit(requestMetadata);
+ private Map<String, String> consolidateMetadata(Map<String, String>
customProperties,
+ Map<String, String> requestMetadata) {
+ Map<String, String> opChainMetadata = new HashMap<>();
+ // 1. put all request level metadata
+ opChainMetadata.putAll(requestMetadata);
+ // 2. put all stageMetadata.customProperties.
+ opChainMetadata.putAll(customProperties);
+ // 3. add all overrides from config if anything is still empty.
+ Integer numGroupsLimit =
QueryOptionsUtils.getNumGroupsLimit(opChainMetadata);
if (numGroupsLimit == null) {
numGroupsLimit = _numGroupsLimit;
}
if (numGroupsLimit != null) {
- customProperties.put(QueryOptionKey.NUM_GROUPS_LIMIT,
Integer.toString(numGroupsLimit));
+ opChainMetadata.put(QueryOptionKey.NUM_GROUPS_LIMIT,
Integer.toString(numGroupsLimit));
}
- Integer maxInitialResultHolderCapacity =
QueryOptionsUtils.getMaxInitialResultHolderCapacity(requestMetadata);
+ Integer maxInitialResultHolderCapacity =
QueryOptionsUtils.getMaxInitialResultHolderCapacity(opChainMetadata);
if (maxInitialResultHolderCapacity == null) {
maxInitialResultHolderCapacity = _maxInitialResultHolderCapacity;
}
if (maxInitialResultHolderCapacity != null) {
- customProperties.put(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+ opChainMetadata.put(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
Integer.toString(maxInitialResultHolderCapacity));
}
- Integer maxRowsInJoin =
QueryOptionsUtils.getMaxRowsInJoin(requestMetadata);
+ Integer maxRowsInJoin =
QueryOptionsUtils.getMaxRowsInJoin(opChainMetadata);
if (maxRowsInJoin == null) {
maxRowsInJoin = _maxRowsInJoin;
}
if (maxRowsInJoin != null) {
- customProperties.put(QueryOptionKey.MAX_ROWS_IN_JOIN,
Integer.toString(maxRowsInJoin));
+ opChainMetadata.put(QueryOptionKey.MAX_ROWS_IN_JOIN,
Integer.toString(maxRowsInJoin));
}
- JoinOverFlowMode joinOverflowMode =
QueryOptionsUtils.getJoinOverflowMode(requestMetadata);
+ JoinOverFlowMode joinOverflowMode =
QueryOptionsUtils.getJoinOverflowMode(opChainMetadata);
if (joinOverflowMode == null) {
joinOverflowMode = _joinOverflowMode;
}
if (joinOverflowMode != null) {
- customProperties.put(QueryOptionKey.JOIN_OVERFLOW_MODE,
joinOverflowMode.name());
+ opChainMetadata.put(QueryOptionKey.JOIN_OVERFLOW_MODE,
joinOverflowMode.name());
}
+ return opChainMetadata;
}
public void cancel(long requestId) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 2bac791ffb..a944f5280f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -48,7 +48,6 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.block.DataBlockValSet;
import org.apache.pinot.query.runtime.operator.block.FilteredDataBlockValSet;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.spi.data.FieldSpec;
@@ -134,12 +133,10 @@ public class AggregateOperator extends MultiStageOperator
{
// Initialize the appropriate executor.
if (!groupSet.isEmpty()) {
_isGroupByAggregation = true;
- StageMetadata stageMetadata = context.getStageMetadata();
- Map<String, String> customProperties =
- stageMetadata != null ? stageMetadata.getCustomProperties() :
Collections.emptyMap();
+ Map<String, String> opChainMetadata = context.getOpChainMetadata();
_groupByExecutor =
new MultistageGroupByExecutor(groupByExpr, aggFunctions,
filterArgIndexArray, aggType, _colNameToIndexMap,
- _resultSchema, customProperties, nodeHint);
+ _resultSchema, opChainMetadata, nodeHint);
} else {
_isGroupByAggregation = false;
_aggregationExecutor =
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 7b192200c0..bed297706c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -46,7 +45,6 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
import
org.apache.pinot.query.runtime.operator.operands.TransformOperandFactory;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.spi.utils.BooleanUtils;
import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
@@ -145,14 +143,12 @@ public class HashJoinOperator extends MultiStageOperator {
} else {
_matchedRightRows = null;
}
- StageMetadata stageMetadata = context.getStageMetadata();
- Map<String, String> customProperties =
- stageMetadata != null ? stageMetadata.getCustomProperties() :
Collections.emptyMap();
- _maxRowsInHashTable = getMaxRowInJoin(customProperties,
node.getJoinHints());
- _joinOverflowMode = getJoinOverflowMode(customProperties,
node.getJoinHints());
+ Map<String, String> metadata = context.getOpChainMetadata();
+ _maxRowsInHashTable = getMaxRowInJoin(metadata, node.getJoinHints());
+ _joinOverflowMode = getJoinOverflowMode(metadata, node.getJoinHints());
}
- private int getMaxRowInJoin(Map<String, String> customProperties, @Nullable
AbstractPlanNode.NodeHint nodeHint) {
+ private int getMaxRowInJoin(Map<String, String> opChainMetadata, @Nullable
AbstractPlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
Map<String, String> joinOptions =
nodeHint._hintOptions.get(PinotHintOptions.JOIN_HINT_OPTIONS);
if (joinOptions != null) {
@@ -162,11 +158,11 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
}
- Integer maxRowsInJoin =
QueryOptionsUtils.getMaxRowsInJoin(customProperties);
+ Integer maxRowsInJoin =
QueryOptionsUtils.getMaxRowsInJoin(opChainMetadata);
return maxRowsInJoin != null ? maxRowsInJoin : DEFAULT_MAX_ROWS_IN_JOIN;
}
- private JoinOverFlowMode getJoinOverflowMode(Map<String, String>
customProperties,
+ private JoinOverFlowMode getJoinOverflowMode(Map<String, String>
contextMetadata,
@Nullable AbstractPlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
Map<String, String> joinOptions =
nodeHint._hintOptions.get(PinotHintOptions.JOIN_HINT_OPTIONS);
@@ -177,7 +173,7 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
}
- JoinOverFlowMode joinOverflowMode =
QueryOptionsUtils.getJoinOverflowMode(customProperties);
+ JoinOverFlowMode joinOverflowMode =
QueryOptionsUtils.getJoinOverflowMode(contextMetadata);
return joinOverflowMode != null ? joinOverflowMode :
DEFAULT_JOIN_OVERFLOW_MODE;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 30de6cfdfc..a81ceb8717 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -93,7 +93,7 @@ public class LeafStageTransferableBlockOperator extends
MultiStageOperator {
_dataSchema = dataSchema;
_queryExecutor = queryExecutor;
_executorService = executorService;
- Integer maxStreamingPendingBlocks =
QueryOptionsUtils.getMaxStreamingPendingBlocks(context.getRequestMetadata());
+ Integer maxStreamingPendingBlocks =
QueryOptionsUtils.getMaxStreamingPendingBlocks(context.getOpChainMetadata());
_blockingQueue = new ArrayBlockingQueue<>(maxStreamingPendingBlocks !=
null ? maxStreamingPendingBlocks
: QueryOptionValue.DEFAULT_MAX_STREAMING_PENDING_BLOCKS);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
index 994036b27a..133479aead 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -72,7 +72,7 @@ public class MultistageGroupByExecutor {
public MultistageGroupByExecutor(List<ExpressionContext> groupByExpr,
AggregationFunction[] aggFunctions,
@Nullable int[] filterArgIndices, AggType aggType, Map<String, Integer>
colNameToIndexMap,
- DataSchema resultSchema, Map<String, String> customProperties, @Nullable
AbstractPlanNode.NodeHint nodeHint) {
+ DataSchema resultSchema, Map<String, String> opChainMetadata, @Nullable
AbstractPlanNode.NodeHint nodeHint) {
_aggType = aggType;
_colNameToIndexMap = colNameToIndexMap;
_groupSet = groupByExpr;
@@ -85,8 +85,8 @@ public class MultistageGroupByExecutor {
_groupKeyToIdMap = new HashMap<>();
- _numGroupsLimit = getNumGroupsLimit(customProperties, nodeHint);
- _maxInitialResultHolderCapacity =
getMaxInitialResultHolderCapacity(customProperties, nodeHint);
+ _numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint);
+ _maxInitialResultHolderCapacity =
getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
for (int i = 0; i < _aggFunctions.length; i++) {
_aggregateResultHolders[i] =
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index cb4326e8b9..73d1b6ee05 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.plan;
+import java.util.Collections;
import java.util.Map;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -38,7 +39,7 @@ public class OpChainExecutionContext {
private final int _stageId;
private final VirtualServerAddress _server;
private final long _deadlineMs;
- private final Map<String, String> _requestMetadata;
+ private final Map<String, String> _opChainMetadata;
private final StageMetadata _stageMetadata;
private final OpChainId _id;
private final OpChainStats _stats;
@@ -46,14 +47,14 @@ public class OpChainExecutionContext {
private final boolean _traceEnabled;
public OpChainExecutionContext(MailboxService mailboxService, long
requestId, int stageId,
- VirtualServerAddress server, long deadlineMs, Map<String, String>
requestMetadata, StageMetadata stageMetadata,
+ VirtualServerAddress server, long deadlineMs, Map<String, String>
opChainMetadata, StageMetadata stageMetadata,
PipelineBreakerResult pipelineBreakerResult) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
_server = server;
_deadlineMs = deadlineMs;
- _requestMetadata = requestMetadata;
+ _opChainMetadata = Collections.unmodifiableMap(opChainMetadata);
_stageMetadata = stageMetadata;
_id = new OpChainId(requestId, server.workerId(), stageId);
_stats = new OpChainStats(_id.toString());
@@ -61,7 +62,7 @@ public class OpChainExecutionContext {
if (pipelineBreakerResult != null &&
pipelineBreakerResult.getOpChainStats() != null) {
_stats.getOperatorStatsMap().putAll(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap());
}
- _traceEnabled =
Boolean.parseBoolean(requestMetadata.get(CommonConstants.Broker.Request.TRACE));
+ _traceEnabled =
Boolean.parseBoolean(opChainMetadata.get(CommonConstants.Broker.Request.TRACE));
}
public MailboxService getMailboxService() {
@@ -84,8 +85,8 @@ public class OpChainExecutionContext {
return _deadlineMs;
}
- public Map<String, String> getRequestMetadata() {
- return _requestMetadata;
+ public Map<String, String> getOpChainMetadata() {
+ return _opChainMetadata;
}
public StageMetadata getStageMetadata() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
index 2ad9df403d..f2543a3363 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.plan;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,7 +35,7 @@ public class StageMetadata {
StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String>
customProperties) {
_workerMetadataList = workerMetadataList;
- _customProperties = customProperties;
+ _customProperties = Collections.unmodifiableMap(customProperties);
}
public List<WorkerMetadata> getWorkerMetadataList() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 8972a89e18..3db86807d7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -56,7 +56,7 @@ public class PipelineBreakerExecutor {
* @param scheduler scheduler service to run the pipeline breaker main
thread.
* @param mailboxService mailbox service to attach the {@link
MailboxReceiveNode} against.
* @param distributedStagePlan the distributed stage plan to run pipeline
breaker on.
- * @param requestMetadata request metadata, including query options
+ * @param opChainMetadata request metadata, including query options
* @param requestId request ID
* @param deadlineMs execution deadline
* @return pipeline breaker result;
@@ -65,7 +65,7 @@ public class PipelineBreakerExecutor {
*/
@Nullable
public static PipelineBreakerResult
executePipelineBreakers(OpChainSchedulerService scheduler,
- MailboxService mailboxService, DistributedStagePlan
distributedStagePlan, Map<String, String> requestMetadata,
+ MailboxService mailboxService, DistributedStagePlan
distributedStagePlan, Map<String, String> opChainMetadata,
long requestId, long deadlineMs) {
PipelineBreakerContext pipelineBreakerContext = new
PipelineBreakerContext();
PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(),
pipelineBreakerContext);
@@ -76,7 +76,7 @@ public class PipelineBreakerExecutor {
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query
information
OpChainExecutionContext opChainExecutionContext =
new OpChainExecutionContext(mailboxService, requestId,
distributedStagePlan.getStageId(),
- distributedStagePlan.getServer(), deadlineMs, requestMetadata,
distributedStagePlan.getStageMetadata(),
+ distributedStagePlan.getServer(), deadlineMs, opChainMetadata,
distributedStagePlan.getStageMetadata(),
null);
return execute(scheduler, pipelineBreakerContext,
opChainExecutionContext);
} catch (Exception e) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 466b29af14..21e50c9993 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -122,7 +122,7 @@ public class ServerPlanRequestUtils {
long requestId = (executionContext.getRequestId() << 16) + ((long)
stagePlan.getStageId() << 8) + (
tableType == TableType.REALTIME ? 1 : 0);
PinotQuery pinotQuery = new PinotQuery();
- Integer leafNodeLimit =
QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getRequestMetadata());
+ Integer leafNodeLimit =
QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getOpChainMetadata());
if (leafNodeLimit != null) {
pinotQuery.setLimit(leafNodeLimit);
} else {
@@ -174,7 +174,7 @@ public class ServerPlanRequestUtils {
* Helper method to update query options.
*/
private static void updateQueryOptions(PinotQuery pinotQuery,
OpChainExecutionContext executionContext) {
- Map<String, String> queryOptions = new
HashMap<>(executionContext.getRequestMetadata());
+ Map<String, String> queryOptions = new
HashMap<>(executionContext.getOpChainMetadata());
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
Long.toString(executionContext.getDeadlineMs() -
System.currentTimeMillis()));
pinotQuery.setQueryOptions(queryOptions);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 3b1d18655d..a90418c742 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -22,6 +22,7 @@ import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -99,7 +100,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
// Deserialize the request
List<DistributedStagePlan> distributedStagePlans;
Map<String, String> requestMetadata;
- requestMetadata = request.getMetadataMap();
+ requestMetadata = Collections.unmodifiableMap(request.getMetadataMap());
long requestId =
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
long timeoutMs =
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]