[ 
https://issues.apache.org/jira/browse/IGNITE-28750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18086708#comment-18086708
 ] 

Ignite TC Bot commented on IGNITE-28750:
----------------------------------------

{panel:title=Branch: [pull/13216/head] Base: [master] : No blockers 
found!|borderStyle=dashed|borderColor=#ccc|titleBGColor=#D6F7C1}{panel}
{panel:title=Branch: [pull/13216/head] Base: [master] : New Tests 
(10)|borderStyle=dashed|borderColor=#ccc|titleBGColor=#D6F7C1}
{color:#00008b}Continuous Query 4{color} [[tests 
2|https://ci2.ignite.apache.org/viewLog.html?buildId=9112042]]
* {color:#013220}IgniteCacheQuerySelfTestSuite6: 
CacheContinuousQueryLocalTxRollbackTest.checkLocalAndDistributedCqAfterNodeFail[cacheMode=REPLICATED]
 - PASSED{color}
* {color:#013220}IgniteCacheQuerySelfTestSuite6: 
CacheContinuousQueryLocalTxRollbackTest.checkLocalAndDistributedCqAfterNodeFail[cacheMode=PARTITIONED]
 - PASSED{color}

{color:#00008b}Cache 6{color} [[tests 
4|https://ci2.ignite.apache.org/viewLog.html?buildId=9112017]]
* {color:#013220}IgniteCacheTestSuite6: 
TxSavepointPessimisticTest.testReleaseSavepointReleasesNestedSavepoints - 
PASSED{color}
* {color:#013220}IgniteCacheTestSuite6: 
TxSavepointPessimisticTest.testDuplicateSavepointWithoutOverwriteThrows - 
PASSED{color}
* {color:#013220}IgniteCacheTestSuite6: 
TxSavepointPessimisticTest.testSavepointRejectedForOptimisticTx - PASSED{color}
* {color:#013220}IgniteCacheTestSuite6: 
TxSavepointPessimisticTest.testRollabackSeveralSavepoints - PASSED{color}

{color:#00008b}Calcite SQL 1{color} [[tests 
4|https://ci2.ignite.apache.org/viewLog.html?buildId=9112033]]
* {color:#013220}IgniteCalciteTestSuite: 
SqlTransactionsSavepointTest.testSavepointCommandsInSqlScript - PASSED{color}
* {color:#013220}IgniteCalciteTestSuite: 
SqlTransactionsSavepointTest.testSavepointCommandsRequireTransaction - 
PASSED{color}
* {color:#013220}IgniteCalciteTestSuite: 
SqlTransactionsSavepointTest.testSqlDmlChangesCanBeRolledBackToSavepoint - 
PASSED{color}
* {color:#013220}IgniteCalciteTestSuite: 
SqlTransactionsSavepointTest.testSqlDmlChangesCanBeRolledBackToSavepointUsingSqlCommands
 - PASSED{color}

{panel}
[TeamCity *--> Run :: All* 
Results|https://ci2.ignite.apache.org/viewLog.html?buildId=9112130&buildTypeId=IgniteTests24Java8_RunAll]
{color:#ffffff}tcbot-analysis-comment chainBuildId=9112130 
rerunBuildIds=9112291{color}

> AssertionError in local continuous query during rollback update counter skip
> ----------------------------------------------------------------------------
>
>                 Key: IGNITE-28750
>                 URL: https://issues.apache.org/jira/browse/IGNITE-28750
>             Project: Ignite
>          Issue Type: Task
>            Reporter: Oleg Valuyskiy
>            Assignee: Oleg Valuyskiy
>            Priority: Major
>              Labels: ise
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> A local Continuous Query created with *ContinuousQuery#setLocal(true)* may 
> fail with *AssertionError* during transactional rollback after a node failure.
> During rollback, *IgniteTxHandler#applyPartitionsUpdatesCounters* logs 
> rollback ranges and notifies the Continuous Query subsystem about skipped 
> update counters:
> {code:java}
> ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr, 
> topVer, rollbackOnPrimary);{code}
> This call enters {*}CacheContinuousQueryManager#skipUpdateCounter{*}:
> {code:java}
> @Nullable public CounterSkipContext skipUpdateCounter(
>     @Nullable CounterSkipContext skipCtx,
>     int part,
>     long cntr,
>     AffinityTopologyVersion topVer,
>     boolean primary
> ) {
>     for (CacheContinuousQueryListener lsnr : lsnrs.values()) 
>         skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, 
> primary);    
>     return skipCtx;
> }
> {code}
> Currently this method invokes *skipUpdateCounter* on every Continuous Query 
> listener registered for the cache. There is no filtering by local-only mode, 
> primary-only mode, or listener type.
> This is problematic for local Continuous Queries: for such queries the 
> *CacheContinuousQueryHandler* is registered only on the node where the query 
> was started. In this case the handler has the following state:
> {code:java}
> loc == true
> locOnly == true{code}
> where *loc* means that the current node is the node that initiated the 
> Continuous Query:
> {code:java}
> final boolean loc = nodeId.equals(ctx.localNodeId());{code}
> and *locOnly* means that the query was created with 
> {*}ContinuousQuery#setLocal(true){*}.
> This combination is valid for a local Continuous Query. However, 
> *CacheContinuousQueryHandler.CacheContinuousQueryListener#skipUpdateCounter* 
> currently rejects this state:
> {code:java}
> if (loc) {
>     assert !locOnly;
>     final Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
>         handleEvent(ctx, skipCtx.entry());
>     ...
> }
> {code}
> As a result, when transaction rollback counter cleanup calls 
> *skipUpdateCounter* for a cache that has a local CQ listener, the local-only 
> listener may reach this unsupported path and fail with {*}AssertionError{*}.
> The important point is that skipped update counters are part of the 
> distributed Continuous Query recovery/order mechanism. For regular 
> distributed Continuous Queries, skipped counters are needed to close gaps in 
> per-partition update counter sequences. A skipped counter is represented as a 
> synthetic *CacheContinuousQueryEntry* marked as filtered:
> {code:java}
> entry = new CacheContinuousQueryEntry(..., part, cntr, topVer, ...);
> entry.markFiltered();{code}
> This synthetic entry is passed through the same recovery path as ordinary CQ 
> entries. It allows *CacheContinuousQueryPartitionRecovery* to advance the 
> counter sequence and release pending events that were waiting for a missing 
> counter.
> For example:
> {code:java}
> counter 10 -> real event
> counter 11 -> rolled back / filtered / skipped
> counter 12 -> real event{code}
> Without a skip marker for counter 11, the recovery logic may not be able to 
> safely release counter 12. Therefore, distributed CQ handlers process skipped 
> counters through *handleEvent(...)* / partition recovery.
> For local-only Continuous Queries, regular update events *are NOT* delivered 
> through the distributed partition recovery path. In 
> {*}CacheContinuousQueryHandler#onEntryUpdated{*}, local-only events are 
> delivered directly to the local listener:
> {code:java}
> if (loc) {
>     if (!locOnly) {
>         Collection<CacheEntryEvent<? extends K, ? extends V>> evts = 
> handleEvent(ctx, entry);
>         notifyLocalListener(evts, trans);
>         ...
>     }
>     else if (!entry.isFiltered())
>         notifyLocalListener(F.asList(evt), trans);
> }
> {code}
> So for {*}locOnly == true{*}, regular user events do not go through 
> {*}handleEvent(...){*}, {*}CacheContinuousQueryPartitionRecovery{*}, or the 
> skipped-counter recovery mechanism. They are delivered directly to the local 
> listener.
> The current implementation violates this invariant because 
> *CacheContinuousQueryManager#skipUpdateCounter* calls every registered 
> listener, including local-only listeners.
> The fix is to explicitly identify local-only CQ listeners and skip them 
> during skipped-counter processing.
> It is important NOT to use *CacheContinuousQueryListener#isPrimaryOnly()* for 
> this filtering. *isPrimaryOnly()* is not equivalent to local-only. It is 
> derived as:
> {code:java}
> return locOnly && !skipPrimaryCheck;{code}
> For PARTITIONED caches with local CQ:
> {code:java}
> locOnly == true
> skipPrimaryCheck == false
> isPrimaryOnly() == true{code}
> So *isPrimaryOnly()* would accidentally work for this case.
> However, for REPLICATED caches local CQ uses {*}skipPrimaryCheck{*}:
> {code:java}
> boolean skipPrimaryCheck =
>     loc &&
>     cctx.config().getCacheMode() == CacheMode.REPLICATED &&
>     cctx.affinityNode();{code}
> In this case:
> {code:java}
> locOnly == true
> skipPrimaryCheck == true
> isPrimaryOnly() == false{code}
> The query is still local-only, but it is not primary-only. Therefore 
> filtering by *isPrimaryOnly()* would not exclude local CQ listeners on 
> replicated caches and the same invalid *loc == true && locOnly == true* 
> *skipUpdateCounter* path could still be reached.
> The proper condition is local-only mode itself, not primary-only mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to