This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 08f6155cad [multistage] Minor Changes to Support Physical Optimizer
Related Changes (#15562)
08f6155cad is described below
commit 08f6155cadf72a9174dfd536283841b8a5b21706
Author: Ankit Sultana <[email protected]>
AuthorDate: Thu Apr 24 06:32:39 2025 +0530
[multistage] Minor Changes to Support Physical Optimizer Related Changes
(#15562)
---
.../query/planner/logical/RelToPlanNodeConverter.java | 2 +-
.../pinot/query/planner/plannode/ExchangeNode.java | 13 +++++++++++--
.../apache/pinot/query/planner/plannode/SetOpNode.java | 12 ++++++------
.../org/apache/pinot/query/routing/WorkerManager.java | 16 ++++++++++++++++
.../runtime/operator/exchange/SingletonExchange.java | 5 +----
.../runtime/operator/exchange/SingletonExchangeTest.java | 10 ----------
6 files changed, 35 insertions(+), 23 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index d42f903ab9..06a3ea731a 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -186,7 +186,7 @@ public final class RelToPlanNodeConverter {
}
}
return new ExchangeNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()),
convertInputs(node.getInputs()),
- exchangeType, distributionType, keys, prePartitioned, collations,
sortOnSender, sortOnReceiver, null);
+ exchangeType, distributionType, keys, prePartitioned, collations,
sortOnSender, sortOnReceiver, null, null);
}
private SetOpNode convertLogicalSetOp(SetOp node) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index 79b78d12ee..ea02ca6b7e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.PlanFragmenter;
+import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
/**
@@ -45,11 +46,13 @@ public class ExchangeNode extends BasePlanNode {
private final boolean _sortOnReceiver;
// Table names should be set for SUB_PLAN exchange type.
private final Set<String> _tableNames;
+ @Nullable
+ private final ExchangeStrategy _exchangeStrategy;
public ExchangeNode(int stageId, DataSchema dataSchema, List<PlanNode>
inputs, PinotRelExchangeType exchangeType,
RelDistribution.Type distributionType, @Nullable List<Integer> keys,
boolean prePartitioned,
@Nullable List<RelFieldCollation> collations, boolean sortOnSender,
boolean sortOnReceiver,
- @Nullable Set<String> tableNames) {
+ @Nullable Set<String> tableNames, ExchangeStrategy exchangeStrategy) {
super(stageId, dataSchema, null, inputs);
_exchangeType = exchangeType;
_distributionType = distributionType;
@@ -59,6 +62,7 @@ public class ExchangeNode extends BasePlanNode {
_sortOnSender = sortOnSender;
_sortOnReceiver = sortOnReceiver;
_tableNames = tableNames;
+ _exchangeStrategy = exchangeStrategy;
}
public PinotRelExchangeType getExchangeType() {
@@ -96,6 +100,11 @@ public class ExchangeNode extends BasePlanNode {
return _tableNames;
}
+ @Nullable
+ public ExchangeStrategy getExchangeStrategy() {
+ return _exchangeStrategy;
+ }
+
@Override
public String explain() {
return "EXCHANGE";
@@ -109,7 +118,7 @@ public class ExchangeNode extends BasePlanNode {
@Override
public PlanNode withInputs(List<PlanNode> inputs) {
return new ExchangeNode(_stageId, _dataSchema, inputs, _exchangeType,
_distributionType, _keys, _prePartitioned,
- _collations, _sortOnSender, _sortOnReceiver, _tableNames);
+ _collations, _sortOnSender, _sortOnReceiver, _tableNames, null);
}
@Override
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
index 7c60731c3e..5a5d3e1656 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
@@ -20,10 +20,10 @@ package org.apache.pinot.query.planner.plannode;
import java.util.List;
import java.util.Objects;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.logical.LogicalIntersect;
-import org.apache.calcite.rel.logical.LogicalMinus;
-import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.core.Union;
import org.apache.pinot.common.utils.DataSchema;
@@ -88,13 +88,13 @@ public class SetOpNode extends BasePlanNode {
UNION, INTERSECT, MINUS;
public static SetOpType fromObject(SetOp setOp) {
- if (setOp instanceof LogicalUnion) {
+ if (setOp instanceof Union) {
return UNION;
}
- if (setOp instanceof LogicalIntersect) {
+ if (setOp instanceof Intersect) {
return INTERSECT;
}
- if (setOp instanceof LogicalMinus) {
+ if (setOp instanceof Minus) {
return MINUS;
}
throw new IllegalArgumentException("Unsupported set operation: " +
setOp.getClass());
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 6963b6f907..5adfadcffc 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -81,6 +81,22 @@ public class WorkerManager {
_routingManager = routingManager;
}
+ public String getInstanceId() {
+ return _instanceId;
+ }
+
+ public String getHostName() {
+ return _hostName;
+ }
+
+ public int getPort() {
+ return _port;
+ }
+
+ public RoutingManager getRoutingManager() {
+ return _routingManager;
+ }
+
public void assignWorkers(PlanFragment rootFragment, DispatchablePlanContext
context) {
// ROOT stage doesn't have a QueryServer as it is strictly only reducing
results, so here we simply assign the
// worker instance with identical server/mailbox port number.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
index 16867b4f2c..f6fd85d954 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
-import org.apache.pinot.query.mailbox.InMemorySendingMailbox;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.MseBlock;
@@ -38,9 +37,7 @@ class SingletonExchange extends BlockExchange {
SingletonExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter
splitter,
Function<List<SendingMailbox>, Integer> statsIndexChooser) {
super(sendingMailboxes, splitter, statsIndexChooser);
- Preconditions.checkArgument(
- sendingMailboxes.size() == 1 && sendingMailboxes.get(0) instanceof
InMemorySendingMailbox,
- "Expect single InMemorySendingMailbox for SingletonExchange");
+ Preconditions.checkArgument(sendingMailboxes.size() == 1, "Expect single
mailbox in Singleton Exchange");
}
SingletonExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter
splitter) {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index 2e06461e1f..6c1cfcb101 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -77,16 +77,6 @@ public class SingletonExchangeTest {
Assert.assertEquals(captor.getValue(), _block);
}
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void shouldThrowWhenSingletonWithNonLocalMailbox()
- throws Exception {
- // Given:
- ImmutableList<SendingMailbox> destinations = ImmutableList.of(_mailbox2);
-
- // When:
- new SingletonExchange(destinations,
BlockSplitter.NO_OP).route(destinations, _block);
- }
-
@Test(expectedExceptions = IllegalArgumentException.class)
public void shouldThrowWhenSingletonWithMultipleMailboxes()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]