songwdfu commented on code in PR #16123:
URL: https://github.com/apache/pinot/pull/16123#discussion_r2257917798


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java:
##########
@@ -181,6 +182,74 @@ public Void visitJoin(JoinNode node, 
ServerPlanRequestContext context) {
     return null;
   }
 
+  @Override
+  public Void visitEnrichedJoin(EnrichedJoinNode node, 
ServerPlanRequestContext context) {
+    // We can reach here for dynamic broadcast SEMI join and lookup join.
+    List<PlanNode> inputs = node.getInputs();
+    PlanNode left = inputs.get(0);
+    PlanNode right = inputs.get(1);
+
+    if (right instanceof MailboxReceiveNode
+        && ((MailboxReceiveNode) right).getExchangeType() == 
PinotRelExchangeType.PIPELINE_BREAKER) {
+      // For dynamic broadcast SEMI join, right child should be a 
PIPELINE_BREAKER exchange. Visit the left child and
+      // attach the dynamic filter to the query.
+      if (visit(left, context)) {
+        // semi join to dynamic filter logic
+        PipelineBreakerResult pipelineBreakerResult = 
context.getPipelineBreakerResult();
+        int resultMapId = pipelineBreakerResult.getNodeIdMap().get(right);
+        List<MseBlock> blocks = 
pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, 
Collections.emptyList());
+        List<Object[]> resultDataContainer = new ArrayList<>();
+        DataSchema dataSchema = right.getDataSchema();
+        for (MseBlock block : blocks) {
+          if (block.isData()) {
+            resultDataContainer.addAll(((MseBlock.Data) 
block).asRowHeap().getRows());
+          }
+        }
+        // TODO: we should keep query stats here as well
+        ServerPlanRequestUtils.attachDynamicFilter(context.getPinotQuery(), 
node.getLeftKeys(), node.getRightKeys(),
+            resultDataContainer, dataSchema);
+
+        // TODO: check whether this, when multiple filter and projects are 
present, is correct

Review Comment:
   That is already verified, forgot to delete the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to