>From Preetham Poluparthi <[email protected]>:

Preetham Poluparthi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21196?usp=email )


Change subject: [ASTERIXDB-3713][COMP] Improve adding replicate operator
......................................................................

[ASTERIXDB-3713][COMP] Improve adding replicate operator

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-71172
Change-Id: I686a87ac763c53339f2418b9aa9696fc5cc9e2c5
---
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/tpch/query-plans/query-plans.04.plan
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
2 files changed, 154 insertions(+), 117 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/96/21196/1

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/tpch/query-plans/query-plans.04.plan
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpch/query-plans/query-plans.04.plan
index 09609e4..4c35d7b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/tpch/query-plans/query-plans.04.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpch/query-plans/query-plans.04.plan
@@ -130,33 +130,33 @@
                                                                                
             join (eq($$211, $$210)) [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
                                                                                
             -- HYBRID_HASH_JOIN [$$210][$$211]  |PARTITIONED|
                                                                                
               exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
-                                                                               
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
                 replicate [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
-                                                                               
                 -- REPLICATE  |PARTITIONED|
+                                                                               
               -- HASH_PARTITION_EXCHANGE [$$210]  |PARTITIONED|
+                                                                               
                 project ([$$208, $$209, $$210]) [cardinality: 0.0, doc-size: 
0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                               
                 -- STREAM_PROJECT  |PARTITIONED|
                                                                                
                   exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
-                                                                               
                   -- HASH_PARTITION_EXCHANGE [$$210]  |PARTITIONED|
-                                                                               
                     project ([$$208, $$209, $$210]) [cardinality: 0.0, 
doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                                               
                     -- STREAM_PROJECT  |PARTITIONED|
+                                                                               
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                     data-scan []<-[$$208, $$209, $$210, $$ps2] <- 
tpch.partsupp [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                               
                     -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                
                       exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
                                                                                
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
                         data-scan []<-[$$208, $$209, $$210, $$ps2] <- 
tpch.partsupp [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                                               
                         -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                               
                           exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
-                                                                               
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
                             empty-tuple-source [cardinality: 0.0, doc-size: 
0.0, op-cost: 0.0, total-cost: 0.0]
-                                                                               
                             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                               
                         empty-tuple-source [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
+                                                                               
                         -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                                                
               exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
                                                                                
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                
                 assign [$$223] <- [$$s2.getField(3)] project: [$$223, $$211] 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                                                
                 -- ASSIGN  |PARTITIONED|
                                                                                
                   exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
                                                                                
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
                     data-scan []<-[$$211, $$s2] <- tpch.supplier [cardinality: 
0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                                               
                     -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                               
                     replicate [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
+                                                                               
                     -- REPLICATE  |PARTITIONED|
                                                                                
                       exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
                                                                                
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
                         empty-tuple-source [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
-                                                                               
                         -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                               
                         data-scan []<-[$$211, $$s2] <- tpch.supplier 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                               
                         -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                               
                           exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                                                               
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
                             empty-tuple-source [cardinality: 0.0, doc-size: 
0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                               
                             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                                                
       exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                                                
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                
         assign [$$218] <- [$$n2.getField(2)] project: [$$218, $$212] 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
@@ -175,20 +175,16 @@
                                                                                
                     -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                                       exchange 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                                       -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        
replicate [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                                        -- 
REPLICATE  |PARTITIONED|
+                                                                        select 
(eq($$r2.getField(1), "EUROPE")) project: [$$213] [cardinality: 0.0, doc-size: 
0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- 
STREAM_SELECT  |PARTITIONED|
                                                                           
exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                                           -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            
select (eq($$r2.getField(1), "EUROPE")) project: [$$213] [cardinality: 0.0, 
doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                                            -- 
STREAM_SELECT  |PARTITIONED|
+                                                                            
data-scan []<-[$$213, $$r2] <- tpch.region [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
+                                                                            -- 
DATASOURCE_SCAN  |PARTITIONED|
                                                                               
exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                                               
-- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
 data-scan []<-[$$213, $$r2] <- tpch.region [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
-                                                                               
 -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                               
   exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                                               
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                               
     empty-tuple-source [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
-                                                                               
     -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                               
 empty-tuple-source [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                                                                               
 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                         exchange [cardinality: 0.0, doc-size: 
0.0, op-cost: 0.0, total-cost: 0.0]
                                         -- HASH_PARTITION_EXCHANGE [$$203]  
|PARTITIONED|
                                           project ([$$237, $$239, $$245, 
$$246, $$247, $$225, $$203, $$202]) [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
@@ -201,34 +197,32 @@
                                                 -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
                                                   assign [$$247, $$246, $$245, 
$$237, $$239, $$225] <- [$$s.getField(6), $$s.getField(4), $$s.getField(2), 
$$s.getField(5), $$s.getField(1), $$s.getField(3)] project: [$$237, $$239, 
$$245, $$246, $$247, $$225, $$201] [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
                                                   -- ASSIGN  |PARTITIONED|
+                                                    assign [$$201, $$s] <- 
[$$211, $$s2] project: [$$201, $$s] [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      exchange [cardinality: 
0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                        replicate 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- REPLICATE  
|PARTITIONED|
+                                                          exchange 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            data-scan 
[]<-[$$211, $$s2] <- tpch.supplier [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                                            -- DATASOURCE_SCAN 
 |PARTITIONED|
+                                                              exchange 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                
empty-tuple-source [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                                                                -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                exchange [cardinality: 0.0, 
doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- HASH_PARTITION_EXCHANGE 
[$$204]  |PARTITIONED|
+                                                  project ([$$203, $$202, 
$$204]) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  
|PARTITIONED|
                                                     exchange [cardinality: 
0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                     -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                      data-scan []<-[$$201, 
$$s] <- tpch.supplier [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, 
total-cost: 0.0]
+                                                      data-scan []<-[$$202, 
$$203, $$204, $$ps] <- tpch.partsupp [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
                                                       -- DATASOURCE_SCAN  
|PARTITIONED|
                                                         exchange [cardinality: 
0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                         -- ONE_TO_ONE_EXCHANGE 
 |PARTITIONED|
                                                           empty-tuple-source 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                           -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                exchange [cardinality: 0.0, 
doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                  assign [$$202, $$203, $$204] 
<- [$$208, $$209, $$210] project: [$$202, $$203, $$204] [cardinality: 0.0, 
doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                  -- ASSIGN  |PARTITIONED|
-                                                    exchange [cardinality: 
0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                    -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                      replicate [cardinality: 
0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                      -- REPLICATE  
|PARTITIONED|
-                                                        exchange [cardinality: 
0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                        -- 
HASH_PARTITION_EXCHANGE [$$210]  |PARTITIONED|
-                                                          project ([$$208, 
$$209, $$210]) [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                          -- STREAM_PROJECT  
|PARTITIONED|
-                                                            exchange 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                            -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              data-scan 
[]<-[$$208, $$209, $$210, $$ps2] <- tpch.partsupp [cardinality: 0.0, doc-size: 
0.0, op-cost: 0.0, total-cost: 0.0]
-                                                              -- 
DATASOURCE_SCAN  |PARTITIONED|
-                                                                exchange 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                                -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  
empty-tuple-source [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 
0.0]
-                                                                  -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                 exchange [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                   assign [$$238, $$221] <- [$$n.getField(1), 
$$n.getField(2)] project: [$$238, $$221, $$205] [cardinality: 0.0, doc-size: 
0.0, op-cost: 0.0, total-cost: 0.0]
@@ -249,21 +243,13 @@
                                                 -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
                         exchange [cardinality: 0.0, doc-size: 0.0, op-cost: 
0.0, total-cost: 0.0]
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          assign [$$206] <- [$$213] project: [$$206] 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                          -- ASSIGN  |PARTITIONED|
+                          select (eq($$r.getField(1), "EUROPE")) project: 
[$$206] [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_SELECT  |PARTITIONED|
                             exchange [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              replicate [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
-                              -- REPLICATE  |PARTITIONED|
+                              data-scan []<-[$$206, $$r] <- tpch.region 
[cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
                                 exchange [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  select (eq($$r2.getField(1), "EUROPE")) 
project: [$$213] [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 
0.0]
-                                  -- STREAM_SELECT  |PARTITIONED|
-                                    exchange [cardinality: 0.0, doc-size: 0.0, 
op-cost: 0.0, total-cost: 0.0]
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      data-scan []<-[$$213, $$r2] <- 
tpch.region [cardinality: 0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                        exchange [cardinality: 0.0, doc-size: 
0.0, op-cost: 0.0, total-cost: 0.0]
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          empty-tuple-source [cardinality: 
0.0, doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, 
doc-size: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 88cfaad..cda57ff 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -57,6 +57,11 @@
 /**
  * Pre-conditions:
  *      FixReplicateOperatorOutputsRule should be fired
+ *
+ *     This rule identifies isomorphic (structurally identical) subtrees 
across multiple query plan
+ *     branches and merges them into a single shared subtree using a 
ReplicateOperator.
+ *     This converts a tree-shaped plan into a DAG (Directed Acyclic Graph), 
eliminating redundant computation.
+ *
  */
 public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {

@@ -412,19 +417,21 @@
         }
     }

+    /**
+     * Grows equivalence classes upward level-by-level.
+     * For each equivalence class, it looks at parent operators.
+     * If all inputs of a parent are in candidate sets, the parent can "grow" 
into the next level.
+     * Uses prune to check isomorphism at each level and split/merge 
equivalence classes.
+    */
     private void genCandidates() throws AlgebricksException {
-        List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new 
ArrayList<>();
         boolean grown = true;
-        while (equivalenceClasses.size() > 0 && grown) {
-            previousEquivalenceClasses.clear();
-            for (List<Mutable<ILogicalOperator>> candidates : 
equivalenceClasses) {
-                List<Mutable<ILogicalOperator>> candidatesCopy = new 
ArrayList<>(candidates);
-                previousEquivalenceClasses.add(candidatesCopy);
-            }
-            List<Mutable<ILogicalOperator>> currentLevelOpRefs = new 
ArrayList<>();
+        while (!equivalenceClasses.isEmpty() && grown) {
             grown = false;
+            List<List<Mutable<ILogicalOperator>>> grownCandidates = new 
ArrayList<>();
+            List<List<Mutable<ILogicalOperator>>> remaining = new 
ArrayList<>();
+            List<Mutable<ILogicalOperator>> currentLevelOpRefs = new 
ArrayList<>();
             for (List<Mutable<ILogicalOperator>> candidates : 
equivalenceClasses) {
-                if (candidates.size() > 0) {
+                if (!candidates.isEmpty()) {
                     for (Mutable<ILogicalOperator> opRef : candidates) {
                         List<Mutable<ILogicalOperator>> refs = 
childrenToParents.get(opRef);
                         if (refs != null) {
@@ -432,15 +439,36 @@
                         }
                     }
                 }
-                if (currentLevelOpRefs.size() == 0) {
+                if (currentLevelOpRefs.isEmpty()) {
                     continue;
                 }
-                grown |= candidatesGrow(currentLevelOpRefs, candidates);
-            }
-            if (currentLevelOpRefs.size() == 0) {
-                break;
+                Pair<List<Mutable<ILogicalOperator>>, 
List<Mutable<ILogicalOperator>>> grownCandidatePair =
+                        candidatesGrow(currentLevelOpRefs, candidates);
+                if (grownCandidatePair.getFirst().size() == 1) {
+                    // if there is only one grown candidate, it means multiple 
candidates grow into the same parent,
+                    // we can directly put the candidates as the equivalence 
class for next level.
+                    // Common case for self-join
+                    equivalenceClasses.clear();
+                    equivalenceClasses.add(candidates);
+                    return;
+                }
+                if (!grownCandidatePair.getFirst().isEmpty()) {
+                    grownCandidates.add(grownCandidatePair.getFirst());
+                    grown = true;
+                }
+                if (!grownCandidatePair.getSecond().isEmpty()) {
+                    // put all the children candidates into remaining since 
they cannot grow anymore.
+                    remaining.add(grownCandidatePair.getSecond());
+                }
             }

+            Pair<Boolean, List<List<Mutable<ILogicalOperator>>>> pruned = 
prune(grownCandidates);
+            equivalenceClasses.clear();
+            equivalenceClasses.addAll(pruned.getSecond());
+            if (pruned.getFirst()) {
+                return;
+            }
+            equivalenceClasses.addAll(remaining);
             for (int i = equivalenceClasses.size() - 1; i >= 0; i--) {
                 for (int j = i - 1; j >= 0; j--) {
                     if 
(equivalenceClasses.get(i).equals(equivalenceClasses.get(j))) {
@@ -449,11 +477,6 @@
                     }
                 }
             }
-            prune();
-        }
-        if (equivalenceClasses.size() < 1 && previousEquivalenceClasses.size() 
> 0) {
-            equivalenceClasses.addAll(previousEquivalenceClasses);
-            prune();
         }
     }

@@ -484,19 +507,30 @@
         }
     }

-    private boolean candidatesGrow(List<Mutable<ILogicalOperator>> opList, 
List<Mutable<ILogicalOperator>> candidates) {
-        List<Mutable<ILogicalOperator>> previousCandidates = new 
ArrayList<>(candidates);
-        candidates.clear();
-        for (Mutable<ILogicalOperator> op : opList) {
-            List<Mutable<ILogicalOperator>> inputs = op.getValue().getInputs();
+    /**
+     * For each candidate, finds its parent operator.
+     * A parent is "valid" (can grow) if all of its inputs are candidates 
(tracked via opToCandidateInputs BitSet).
+     * Returns a pair: (grown parents, remaining children that couldn't grow).
+     * */
+    private Pair<List<Mutable<ILogicalOperator>>, 
List<Mutable<ILogicalOperator>>> candidatesGrow(
+            List<Mutable<ILogicalOperator>> opList, 
List<Mutable<ILogicalOperator>> candidates) {
+        List<Mutable<ILogicalOperator>> parents = new ArrayList<>();
+        List<Mutable<ILogicalOperator>> children = new ArrayList<>();
+
+        for (Mutable<ILogicalOperator> candidate : candidates) {
             boolean validCandidate = false;
-            for (int i = 0; i < inputs.size(); i++) {
-                Mutable<ILogicalOperator> inputRef = inputs.get(i);
-                for (Mutable<ILogicalOperator> candidate : previousCandidates) 
{
-                    // if current input is in candidates
+            Mutable<ILogicalOperator> parent = null;
+            for (Mutable<ILogicalOperator> op : opList) {
+                if (validCandidate) {
+                    break;
+                }
+                List<Mutable<ILogicalOperator>> inputs = 
op.getValue().getInputs();
+                for (int i = 0; i < inputs.size(); i++) {
+                    Mutable<ILogicalOperator> inputRef = inputs.get(i);
                     if (inputRef.getValue().equals(candidate.getValue())) {
                         if (inputs.size() == 1) {
                             validCandidate = true;
+                            parent = op;
                         } else {
                             BitSet candidateInputBitMap = 
opToCandidateInputs.get(op);
                             if (candidateInputBitMap == null) {
@@ -506,63 +540,80 @@
                             candidateInputBitMap.set(i);
                             if (candidateInputBitMap.cardinality() == 
inputs.size()) {
                                 validCandidate = true;
+                                parent = op;
                             }
+
                         }
                         break;
                     }
                 }
+
             }
-            if (!validCandidate) {
-                continue;
-            }
-            if (!candidates.contains(op)) {
-                candidates.add(op);
+            if (validCandidate) {
+                parents.add(parent);
+            } else {
+                children.add(candidate);
             }
         }
-        if (candidates.isEmpty()) {
-            candidates.addAll(previousCandidates);
-            return false;
-        }
-        return true;
+        return new Pair<>(parents, children);
     }

-    private void prune() throws AlgebricksException {
-        List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new 
ArrayList<>();
-        for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
-            List<Mutable<ILogicalOperator>> candidatesCopy = new 
ArrayList<>(candidates);
-            previousEquivalenceClasses.add(candidatesCopy);
-        }
-        equivalenceClasses.clear();
-        for (List<Mutable<ILogicalOperator>> candidates : 
previousEquivalenceClasses) {
-            boolean[] reserved = new boolean[candidates.size()];
-            for (int i = candidates.size() - 1; i >= 0; i--) {
+    /**
+     * Checks isomorphism among grown candidates.
+     * Groups isomorphic operators into equivalence classes.
+     * If no two operators are isomorphic, returns true, and
+     * it falls back to their children as the equivalence class, this is the 
"divergence" point.
+     * */
+    private Pair<Boolean, List<List<Mutable<ILogicalOperator>>>> prune(
+            List<List<Mutable<ILogicalOperator>>> grownCandidates) throws 
AlgebricksException {
+        List<List<Mutable<ILogicalOperator>>> eqClasses = new ArrayList<>();
+        for (List<Mutable<ILogicalOperator>> parents : grownCandidates) {
+            boolean[] reserved = new boolean[parents.size()];
+            for (int i = parents.size() - 1; i >= 0; i--) {
                 if (!reserved[i]) {
                     List<Mutable<ILogicalOperator>> equivalentClass = new 
ArrayList<>();
-                    ILogicalOperator candidate = candidates.get(i).getValue();
-                    equivalentClass.add(candidates.get(i));
+                    ILogicalOperator candidate = parents.get(i).getValue();
+                    equivalentClass.add(parents.get(i));
                     for (int j = i - 1; j >= 0; j--) {
-                        ILogicalOperator peer = candidates.get(j).getValue();
+                        ILogicalOperator peer = parents.get(j).getValue();
                         boolean isomorphic = candidate.getInputs().size() > 1
                                 ? 
IsomorphismUtilities.isOperatorIsomorphicPlanSegment(candidate, peer)
                                 : 
IsomorphismUtilities.isOperatorIsomorphic(candidate, peer);
                         if (isomorphic) {
                             reserved[i] = true;
                             reserved[j] = true;
-                            equivalentClass.add(candidates.get(j));
+                            equivalentClass.add(parents.get(j));
                         }
                     }
                     if (equivalentClass.size() > 1) {
-                        equivalenceClasses.add(equivalentClass);
+                        eqClasses.add(equivalentClass);
                         Collections.reverse(equivalentClass);
                     }
                 }
             }
-            for (int i = candidates.size() - 1; i >= 0; i--) {
-                if (!reserved[i]) {
-                    candidates.remove(i);
+            boolean isPruned = true;
+            for (int i = parents.size() - 1; i >= 0; i--) {
+                if (reserved[i]) {
+                    isPruned = false;
+                    break;
                 }
             }
+            if (isPruned) {
+                List<List<Mutable<ILogicalOperator>>> children = new 
ArrayList<>();
+                for (Mutable<ILogicalOperator> parent : parents) {
+                    int size = parent.get().getInputs().size();
+                    for (int i = 0; i < size; i++) {
+                        Mutable<ILogicalOperator> child = 
parent.get().getInputs().get(i);
+                        if (children.size() == i) {
+                            children.add(new ArrayList<>());
+                        }
+                        children.get(i).add(child);
+                    }
+                }
+                return new Pair<>(true, children);
+            }
         }
+        return new Pair<>(false, eqClasses);
     }

     private boolean[] 
computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) {

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21196?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I686a87ac763c53339f2418b9aa9696fc5cc9e2c5
Gerrit-Change-Number: 21196
Gerrit-PatchSet: 1
Gerrit-Owner: Preetham Poluparthi <[email protected]>

Reply via email to