>From Ritik <[email protected]>:

Ritik has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19528 )

 (

5 is the latest approved patch-set.
No files were changed between the latest approved patch-set and the submitted 
one.
 )Change subject: [ASTERIXDB-3582][COMP] Fix Concurrent Modification in Filter 
Pushdown
......................................................................

[ASTERIXDB-3582][COMP] Fix Concurrent Modification in Filter Pushdown

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
The issue occurs due to the presence of nested subplans. While iterating
over the subplan map to identify filters that can be pushed down, we
encounter new subplans that consume the output of the current subplan.
To account for these new subplans, we attempt to add them to the map
during iteration.

However, since the current implementation uses a regular HashMap, which
does not allow modifications while iterating, this results in a
ConcurrentModificationException.

Ext-ref: MB-65792
Change-Id: I636ed79ffd1de8ce8fd0729cf22867835527ff9a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19528
Integration-Tests: Jenkins <[email protected]>
Tested-by: Ritik <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.plan
M asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.001.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.query.sqlpp
5 files changed, 236 insertions(+), 4 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, approved
  Ritik: Verified
  Anon. E. Moose #1000171:
  Jenkins: Verified




diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index cad9315..7d4f7a5 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -29,8 +29,10 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;

 import org.apache.asterix.om.base.IAObject;
@@ -55,6 +57,7 @@
     private final Map<ILogicalOperator, List<UseDescriptor>> subplanSelects;
     private final List<UseDescriptor> scanCandidateFilters;
     private final Set<LogicalVariable> subplanProducedVariables;
+    private final Queue<ILogicalOperator> subplanOpsQueue;

     public AbstractFilterPushdownProcessor(PushdownContext pushdownContext, 
IOptimizationContext context) {
         super(pushdownContext, context);
@@ -62,6 +65,7 @@
         subplanSelects = new HashMap<>();
         scanCandidateFilters = new ArrayList<>();
         subplanProducedVariables = new HashSet<>();
+        subplanOpsQueue = new LinkedList<>();
     }

     @Override
@@ -191,8 +195,10 @@
      * @param scanDescriptor data-scan descriptor
      */
     private void putPotentialSelects(ScanDefineDescriptor scanDescriptor) 
throws AlgebricksException {
-        for (Map.Entry<ILogicalOperator, List<UseDescriptor>> selects : 
subplanSelects.entrySet()) {
-            ILogicalOperator subplan = selects.getKey();
+        subplanOpsQueue.clear();
+        subplanOpsQueue.addAll(subplanSelects.keySet());
+        while (!subplanOpsQueue.isEmpty()) {
+            ILogicalOperator subplan = subplanOpsQueue.poll();
             subplanProducedVariables.clear();
             VariableUtilities.getProducedVariables(subplan, 
subplanProducedVariables);
             for (LogicalVariable producedVar : subplanProducedVariables) {
@@ -236,8 +242,13 @@
         boolean inSubplan = useDescriptor.inSubplan();
         if (inSubplan && useOperator.getOperatorTag() == 
LogicalOperatorTag.SELECT) {
             ILogicalOperator subplanOp = useDescriptor.getSubplanOperator();
-            List<UseDescriptor> selects = 
subplanSelects.computeIfAbsent(subplanOp, k -> new ArrayList<>());
-            selects.add(useDescriptor);
+            List<UseDescriptor> selects = subplanSelects.get(subplanOp);
+            if (selects == null) {
+                subplanOpsQueue.add(subplanOp);
+                subplanSelects.computeIfAbsent(subplanOp, k -> new 
ArrayList<>()).add(useDescriptor);
+            } else {
+                selects.add(useDescriptor);
+            }
         }

         // Finally, push down if not in subplan
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.001.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.001.ddl.sqlpp
new file mode 100644
index 0000000..4a22988
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.001.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE DATASET reviews
+PRIMARY KEY (itemno: int, custid: string) WITH {
+    "storage-format": { "format": "column" }
+};
+
+CREATE FUNCTION sent(txt) {
+LET pos = ["bomb","needs"], neg=["shrinks","shrunk","smaller"], exp = 
split(txt, " ")
+SELECT CASE
+         WHEN (
+               (SOME w IN pos SATISFIES (w IN exp))
+               AND
+               (EVERY w IN neg SATISFIES (w NOT IN exp))
+              )
+         THEN "positive"
+         WHEN (
+               (SOME w IN neg SATISFIES (w IN exp))
+               AND
+               (EVERY w IN pos SATISFIES (w NOT IN exp))
+              )
+         THEN "negative"
+         ELSE "neutral"
+       END
+};
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.query.sqlpp
new file mode 100644
index 0000000..412c9a7
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+USE test;
+
+EXPLAIN SELECT sent(r.text) FROM reviews r;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.plan
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.plan
new file mode 100644
index 0000000..6228382
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582/ASTERIXDB-3582.002.plan
@@ -0,0 +1,117 @@
+distribute result [$$163] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    assign [$$163] <- [{"$1": $$162}] project: [$$163] [cardinality: 0.0, 
op-cost: 0.0, total-cost: 0.0]
+    -- ASSIGN  |PARTITIONED|
+      project ([$$162]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- STREAM_PROJECT  |PARTITIONED|
+        subplan {
+                  aggregate [$$162] <- [listify($$161)] [cardinality: 0.0, 
op-cost: 0.0, total-cost: 0.0]
+                  -- AGGREGATE  |LOCAL|
+                    assign [$$161] <- [{"$2": switch-case(true, and($$140, 
$$146), "positive", and($$153, $$159), "negative", "neutral")}] [cardinality: 
0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |LOCAL|
+                      subplan {
+                                aggregate [$$159] <- [empty-stream()] 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- AGGREGATE  |LOCAL|
+                                  select (not(if-missing-or-null($$158, 
false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_SELECT  |LOCAL|
+                                    subplan {
+                                              aggregate [$$158] <- 
[empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- AGGREGATE  |LOCAL|
+                                                select 
(not(if-missing-or-null(neq($$w, $#6), false))) [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                                -- STREAM_SELECT  |LOCAL|
+                                                  unnest $#6 <- 
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                                                  -- UNNEST  |LOCAL|
+                                                    nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- NESTED_TUPLE_SOURCE  
|LOCAL|
+                                           } [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]
+                                    -- SUBPLAN  |LOCAL|
+                                      unnest $$w <- scan-collection(array: [ 
"bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- UNNEST  |LOCAL|
+                                        nested tuple source [cardinality: 0.0, 
op-cost: 0.0, total-cost: 0.0]
+                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                             } [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                      -- SUBPLAN  |LOCAL|
+                        subplan {
+                                  aggregate [$$153] <- [non-empty-stream()] 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- AGGREGATE  |LOCAL|
+                                    select ($$152) [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                    -- STREAM_SELECT  |LOCAL|
+                                      subplan {
+                                                aggregate [$$152] <- 
[non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- AGGREGATE  |LOCAL|
+                                                  select (eq($$w, $#5)) 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_SELECT  |LOCAL|
+                                                    unnest $#5 <- 
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                                                    -- UNNEST  |LOCAL|
+                                                      nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- NESTED_TUPLE_SOURCE  
|LOCAL|
+                                             } [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                      -- SUBPLAN  |LOCAL|
+                                        unnest $$w <- scan-collection(array: [ 
"shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                                        -- UNNEST  |LOCAL|
+                                          nested tuple source [cardinality: 
0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                               } [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                        -- SUBPLAN  |LOCAL|
+                          subplan {
+                                    aggregate [$$146] <- [empty-stream()] 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- AGGREGATE  |LOCAL|
+                                      select (not(if-missing-or-null($$145, 
false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- STREAM_SELECT  |LOCAL|
+                                        subplan {
+                                                  aggregate [$$145] <- 
[empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- AGGREGATE  |LOCAL|
+                                                    select 
(not(if-missing-or-null(neq($$w, $#4), false))) [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                                    -- STREAM_SELECT  |LOCAL|
+                                                      unnest $#4 <- 
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                                                      -- UNNEST  |LOCAL|
+                                                        nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- NESTED_TUPLE_SOURCE 
 |LOCAL|
+                                               } [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                        -- SUBPLAN  |LOCAL|
+                                          unnest $$w <- scan-collection(array: 
[ "shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]
+                                          -- UNNEST  |LOCAL|
+                                            nested tuple source [cardinality: 
0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                 } [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]
+                          -- SUBPLAN  |LOCAL|
+                            subplan {
+                                      aggregate [$$140] <- 
[non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- AGGREGATE  |LOCAL|
+                                        select ($$139) [cardinality: 0.0, 
op-cost: 0.0, total-cost: 0.0]
+                                        -- STREAM_SELECT  |LOCAL|
+                                          subplan {
+                                                    aggregate [$$139] <- 
[non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- AGGREGATE  |LOCAL|
+                                                      select (eq($$w, $#3)) 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- STREAM_SELECT  |LOCAL|
+                                                        unnest $#3 <- 
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]
+                                                        -- UNNEST  |LOCAL|
+                                                          nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- 
NESTED_TUPLE_SOURCE  |LOCAL|
+                                                 } [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                                          -- SUBPLAN  |LOCAL|
+                                            unnest $$w <- 
scan-collection(array: [ "bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]
+                                            -- UNNEST  |LOCAL|
+                                              nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                   } [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]
+                            -- SUBPLAN  |LOCAL|
+                              nested tuple source [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+               } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SUBPLAN  |PARTITIONED|
+          assign [$$166] <- [$$r.getField("text")] project: [$$166] 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ASSIGN  |PARTITIONED|
+            project ([$$r]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STREAM_PROJECT  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$164, $$165, $$r] <- test.reviews project 
({text:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index c112b42..93c28bb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16536,6 +16536,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="filter/ASTERIXDB-3582">
+        <output-dir compare="Text">filter/ASTERIXDB-3582</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="filter/ASTERIXDB-3582-2">
         <output-dir compare="Text">filter/ASTERIXDB-3582-2</output-dir>
       </compilation-unit>

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19528
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I636ed79ffd1de8ce8fd0729cf22867835527ff9a
Gerrit-Change-Number: 19528
Gerrit-PatchSet: 7
Gerrit-Owner: Ritik <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Ritik <[email protected]>
Gerrit-MessageType: merged

Reply via email to