[
https://issues.apache.org/jira/browse/ASTERIXDB-3582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17944619#comment-17944619
]
ASF subversion and git services commented on ASTERIXDB-3582:
------------------------------------------------------------
Commit 571353167b8a8527fe84b29d9637d63cc3ba277b in asterixdb's branch
refs/heads/master from Ritik Raj
[ https://gitbox.apache.org/repos/asf?p=asterixdb.git;h=571353167b ]
[ASTERIXDB-3582][COMP] Fix Concurrent Modification in Filter Pushdown
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
The issue occurs due to the presence of nested subplans. While iterating
over the subplan map to identify filters that can be pushed down, we
encounter new subplans that consume the output of the current subplan.
To account for these new subplans, we attempt to add them to the map
during iteration.
However, since the current implementation uses a regular HashMap, which
does not allow modifications while iterating, this results in a
ConcurrentModificationException.
Ext-ref: MB-65792
Change-Id: I636ed79ffd1de8ce8fd0729cf22867835527ff9a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19528
Integration-Tests: Jenkins <[email protected]>
Tested-by: Ritik <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
> Issues in values and filter pushdown for column collections
> -----------------------------------------------------------
>
> Key: ASTERIXDB-3582
> URL: https://issues.apache.org/jira/browse/ASTERIXDB-3582
> Project: Apache AsterixDB
> Issue Type: Bug
> Components: COMP - Compiler, STO - Storage
> Reporter: Ritik Raj
> Assignee: Ritik Raj
> Priority: Critical
> Labels: triaged
> Fix For: 0.9.10
>
>
> There are few issues identified related to value and filter pushdown for
> column collections.
> 1. For the following query
> {code:java}
> USE commerce.marketing;
> CREATE FUNCTION sent(txt) {
> LET pos = ["bomb","needs"], neg=["shrinks","shrunk","smaller"], exp =
> split(txt, " ")
> SELECT CASE
> WHEN (
> (SOME w IN pos SATISFIES (w IN exp))
> AND
> (EVERY w IN neg SATISFIES (w NOT IN exp))
> )
> THEN "positive"
> WHEN (
> (SOME w IN neg SATISFIES (w IN exp))
> AND
> (EVERY w IN pos SATISFIES (w NOT IN exp))
> )
> THEN "negative"
> ELSE "neutral"
> END
> };
> USE commerce.marketing;
> SELECT r.*, sent(r.text) FROM reviews r;{code}
> The query gives out Internal Error with the following trace
> {code:java}
> 2025-03-17T17:13:50.133+00:00 INFO CBAS.translator.QueryTranslator
> [QueryTranslator:4ba4e252-4fde-4a6e-8b97-039b11366919] null
> java.util.ConcurrentModificationException: null
> at
> java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1597) ~[?:?]
> at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1630)
> ~[?:?]
> at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1628)
> ~[?:?]
> at
> org.apache.asterix.optimizer.rules.pushdown.processor.AbstractFilterPushdownProcessor.putPotentialSelects(AbstractFilterPushdownProcessor.java:194)
> ~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.optimizer.rules.pushdown.processor.AbstractFilterPushdownProcessor.process(AbstractFilterPushdownProcessor.java:79)
> ~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.optimizer.rules.pushdown.PushdownProcessorsExecutor.execute(PushdownProcessorsExecutor.java:63)
> ~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.optimizer.rules.PushValueAccessAndFilterDownRule.rewritePre(PushValueAccessAndFilterDownRule.java:102)
> ~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController.rewriteOperatorRef(AbstractRuleController.java:79)
> ~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController.rewriteWithRuleCollection(SequentialOnceRuleController.java:43)
> ~[algebricks-compiler-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.runOptimizationSets(HeuristicOptimizer.java:92)
> ~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.runPhysicalOptimizationSets(HeuristicOptimizer.java:122)
> ~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.optimize(HeuristicOptimizer.java:66)
> ~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder$CompilerImpl.optimize(HeuristicCompilerFactoryBuilder.java:165)
> ~[algebricks-compiler-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.api.common.APIFramework.compileQuery(APIFramework.java:289)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.app.translator.QueryTranslator.rewriteCompileQuery(QueryTranslator.java:4322)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.app.translator.QueryTranslator.lambda$handleQuery$3(QueryTranslator.java:5280)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.app.translator.QueryTranslator.createAndRunJob(QueryTranslator.java:5433)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.app.translator.QueryTranslator.deliverResult(QueryTranslator.java:5326)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.app.translator.QueryTranslator.handleQuery(QueryTranslator.java:5296)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.app.translator.QueryTranslator.compileAndExecute(QueryTranslator.java:534)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.app.message.ExecuteStatementRequestMessage.handle(ExecuteStatementRequestMessage.java:181)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.asterix.messaging.CCMessageBroker.receivedMessage(CCMessageBroker.java:64)
> ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
> at
> org.apache.hyracks.control.cc.work.ApplicationMessageWork.lambda$notifyMessageBroker$0(ApplicationMessageWork.java:74)
> ~[hyracks-control-cc-1.0.3-2467.jar:1.0.3-2467]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> [?:?]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> [?:?]
> at java.base/java.lang.Thread.run(Thread.java:840) [?:?] {code}
> The reason behind failure is we are trying to modify a map which tracks the
> subplan operators while iterating the map.
> if we see the below plan:
> {code:java}
> distribute result [$$163] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- DISTRIBUTE_RESULT |PARTITIONED|
> project ([$$163]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- STREAM_PROJECT |PARTITIONED|
> assign [$$163] <- [{"$1": $$162}] [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- ASSIGN |PARTITIONED|
> project ([$$162]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] --
> |UNPARTITIONED|
> subplan {
> aggregate [$$162] <- [listify($$161)] [cardinality: 0.0,
> op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> assign [$$161] <- [{"$2": switch-case(true, and($$140,
> $$146), "positive", and($$153, $$159), "negative", "neutral")}] [cardinality:
> 0.0, op-cost: 0.0, total-cost: 0.0]
> -- ASSIGN |LOCAL|
> subplan {
> aggregate [$$159] <- [empty-stream()]
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> select (not(if-missing-or-null($$158,
> false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- STREAM_SELECT |LOCAL|
> subplan {
> aggregate [$$158] <-
> [empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> select
> (not(if-missing-or-null(neq($$w, $#6), false))) [cardinality: 0.0, op-cost:
> 0.0, total-cost: 0.0]
> -- STREAM_SELECT |LOCAL|
> unnest $#6 <-
> scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- UNNEST |LOCAL|
> nested tuple source
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- NESTED_TUPLE_SOURCE
> |LOCAL|
> } [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- SUBPLAN |LOCAL|
> unnest $$w <- scan-collection(array: [
> "bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- UNNEST |LOCAL|
> nested tuple source [cardinality:
> 0.0, op-cost: 0.0, total-cost: 0.0]
> -- NESTED_TUPLE_SOURCE |LOCAL|
> } [cardinality: 0.0, op-cost: 0.0, total-cost:
> 0.0]
> -- SUBPLAN |LOCAL|
> subplan {
> aggregate [$$153] <- [non-empty-stream()]
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> select ($$152) [cardinality: 0.0,
> op-cost: 0.0, total-cost: 0.0]
> -- STREAM_SELECT |LOCAL|
> subplan {
> aggregate [$$152] <-
> [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> select (eq($$w, $#5))
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- STREAM_SELECT |LOCAL|
> unnest $#5 <-
> scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- UNNEST |LOCAL|
> nested tuple source
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- NESTED_TUPLE_SOURCE
> |LOCAL|
> } [cardinality: 0.0, op-cost:
> 0.0, total-cost: 0.0]
> -- SUBPLAN |LOCAL|
> unnest $$w <- scan-collection(array:
> [ "shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- UNNEST |LOCAL|
> nested tuple source [cardinality:
> 0.0, op-cost: 0.0, total-cost: 0.0]
> -- NESTED_TUPLE_SOURCE |LOCAL|
> } [cardinality: 0.0, op-cost: 0.0, total-cost:
> 0.0]
> -- SUBPLAN |LOCAL|
> subplan {
> aggregate [$$146] <- [empty-stream()]
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> select (not(if-missing-or-null($$145,
> false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- STREAM_SELECT |LOCAL|
> subplan {
> aggregate [$$145] <-
> [empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> select
> (not(if-missing-or-null(neq($$w, $#4), false))) [cardinality: 0.0, op-cost:
> 0.0, total-cost: 0.0]
> -- STREAM_SELECT |LOCAL|
>
> unnest $#4 <-
> scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- UNNEST |LOCAL|
> nested tuple source
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> --
> NESTED_TUPLE_SOURCE |LOCAL|
> } [cardinality: 0.0, op-cost:
> 0.0, total-cost: 0.0]
> -- SUBPLAN |LOCAL|
> unnest $$w <-
> scan-collection(array: [ "shrinks", "shrunk", "smaller" ]) [cardinality: 0.0,
> op-cost: 0.0, total-cost: 0.0]
> -- UNNEST |LOCAL|
> nested tuple source [cardinality:
> 0.0, op-cost: 0.0, total-cost: 0.0]
> -- NESTED_TUPLE_SOURCE |LOCAL|
> } [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- SUBPLAN |LOCAL|
> subplan {
> aggregate [$$140] <-
> [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> select ($$139) [cardinality: 0.0,
> op-cost: 0.0, total-cost: 0.0]
> -- STREAM_SELECT |LOCAL|
> subplan {
> aggregate [$$139] <-
> [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- AGGREGATE |LOCAL|
> select (eq($$w, $#3))
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- STREAM_SELECT
> |LOCAL|
> unnest $#3 <-
> scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- UNNEST |LOCAL|
> nested tuple source
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> --
> NESTED_TUPLE_SOURCE |LOCAL|
> } [cardinality: 0.0,
> op-cost: 0.0, total-cost: 0.0]
> -- SUBPLAN |LOCAL|
> unnest $$w <-
> scan-collection(array: [ "bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- UNNEST |LOCAL|
> nested tuple source
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- NESTED_TUPLE_SOURCE |LOCAL|
>
> } [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- SUBPLAN |LOCAL|
> nested tuple source [cardinality: 0.0, op-cost:
> 0.0, total-cost: 0.0]
> -- NESTED_TUPLE_SOURCE |LOCAL|
> } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- SUBPLAN |PARTITIONED|
> project ([$$166]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
>
> -- STREAM_PROJECT |PARTITIONED|
> assign [$$166] <- [$$r.getField("text")] [cardinality: 0.0,
> op-cost: 0.0, total-cost: 0.0]
> -- ASSIGN |PARTITIONED|
> project ([$$r]) [cardinality: 0.0, op-cost: 0.0, total-cost:
> 0.0] -- |UNPARTITIONED|
> data-scan []<-[$$164, $$165, $$r] <- marketing.reviews
> [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
> -- DATASOURCE_SCAN |PARTITIONED|
> empty-tuple-source [cardinality: 0.0, op-cost: 0.0,
> total-cost: 0.0]
> -- EMPTY_TUPLE_SOURCE |PARTITIONED| {code}
>
> The issue arises because subplans within subplans exist, and while iterating
> over the subplan map to identify filters that can be pushed down, we
> encounter new subplans that consume the output of the current subplan. To
> account for these newly discovered subplans, we attempt to add them to the
> map during iteration. However, since a regular HashMap is being used, which
> does not support modifications while iterating, this leads to a
> ConcurrentModificationException.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)