[
https://issues.apache.org/jira/browse/IGNITE-28750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Oleg Valuyskiy updated IGNITE-28750:
------------------------------------
Description:
A local Continuous Query created with *ContinuousQuery#setLocal(true)* may fail
with *AssertionError* during transactional rollback after a node failure.
During rollback, *IgniteTxHandler#applyPartitionsUpdatesCounters* logs rollback
ranges and notifies the Continuous Query subsystem about skipped update
counters:
{code:java}
ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr,
topVer, rollbackOnPrimary);{code}
This call enters {*}CacheContinuousQueryManager#skipUpdateCounter{*}:
{code:java}
@Nullable public CounterSkipContext skipUpdateCounter(
@Nullable CounterSkipContext skipCtx,
int part,
long cntr,
AffinityTopologyVersion topVer,
boolean primary
) {
for (CacheContinuousQueryListener lsnr : lsnrs.values())
skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer,
primary);
return skipCtx;
}
{code}
Currently this method invokes *skipUpdateCounter* on every Continuous Query
listener registered for the cache. There is no filtering by local-only mode,
primary-only mode, or listener type.
This is problematic for local Continuous Queries: for such queries the
*CacheContinuousQueryHandler* is registered only on the node where the query
was started. In this case the handler has the following state:
{code:java}
loc == true
locOnly == true{code}
where *loc* means that the current node is the node that initiated the
Continuous Query:
{code:java}
final boolean loc = nodeId.equals(ctx.localNodeId());{code}
and *locOnly* means that the query was created with
{*}ContinuousQuery#setLocal(true){*}.
This combination is valid for a local Continuous Query. However,
*CacheContinuousQueryHandler.CacheContinuousQueryListener#skipUpdateCounter*
currently rejects this state:
{code:java}
if (loc) {
assert !locOnly;
final Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
handleEvent(ctx, skipCtx.entry());
...
}
{code}
As a result, when transaction rollback counter cleanup calls
*skipUpdateCounter* for a cache that has a local CQ listener, the local-only
listener may reach this unsupported path and fail with {*}AssertionError{*}.
The important point is that skipped update counters are part of the distributed
Continuous Query recovery/order mechanism. For regular distributed Continuous
Queries, skipped counters are needed to close gaps in per-partition update
counter sequences. A skipped counter is represented as a synthetic
*CacheContinuousQueryEntry* marked as filtered:
{code:java}
entry = new CacheContinuousQueryEntry(..., part, cntr, topVer, ...);
entry.markFiltered();{code}
This synthetic entry is passed through the same recovery path as ordinary CQ
entries. It allows *CacheContinuousQueryPartitionRecovery* to advance the
counter sequence and release pending events that were waiting for a missing
counter.
For example:
{code:java}
counter 10 -> real event
counter 11 -> rolled back / filtered / skipped
counter 12 -> real event{code}
Without a skip marker for counter 11, the recovery logic may not be able to
safely release counter 12. Therefore, distributed CQ handlers process skipped
counters through *handleEvent(...)* / partition recovery.
For local-only Continuous Queries, regular update events *are NOT* delivered
through the distributed partition recovery path. In
{*}CacheContinuousQueryHandler#onEntryUpdated{*}, local-only events are
delivered directly to the local listener:
{code:java}
if (loc) {
if (!locOnly) {
Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
handleEvent(ctx, entry);
notifyLocalListener(evts, trans);
...
}
else if (!entry.isFiltered())
notifyLocalListener(F.asList(evt), trans);
}
{code}
So for locOnly == true, regular user events do not go through
{*}handleEvent(...){*}, {*}CacheContinuousQueryPartitionRecovery{*}, or the
skipped-counter recovery mechanism. They are delivered directly to the local
listener.
Because of that, local-only CQ listeners should not participate in
*skipUpdateCounter* processing. A skipped counter is not a user-visible event,
and for local-only CQ there is no distributed pending-event chain that needs to
be advanced by a synthetic filtered entry.
The current implementation violates this invariant because
CacheContinuousQueryManager#skipUpdateCounter blindly calls every registered
listener, including local-only listeners.
The fix is to explicitly identify local-only CQ listeners and skip them during
skipped-counter processing.
It is important NOT to use *CacheContinuousQueryListener#isPrimaryOnly()* for
this filtering. *isPrimaryOnly()* is not equivalent to local-only. It is
derived as:
{code:java}
return locOnly && !skipPrimaryCheck;{code}
For PARTITIONED caches with local CQ:
{code:java}
locOnly == true
skipPrimaryCheck == false
isPrimaryOnly() == true{code}
So *isPrimaryOnly()* would accidentally work for this case.
However, for REPLICATED caches local CQ uses {*}skipPrimaryCheck{*}:
{code:java}
boolean skipPrimaryCheck =
loc &&
cctx.config().getCacheMode() == CacheMode.REPLICATED &&
cctx.affinityNode();{code}
In this case:
{code:java}
locOnly == true
skipPrimaryCheck == true
isPrimaryOnly() == false{code}
The query is still local-only, but it is not primary-only. Therefore filtering
by *isPrimaryOnly()* would not exclude local CQ listeners on replicated caches
and the same invalid *loc == true && locOnly == true* *skipUpdateCounter* path
could still be reached.
The proper condition is local-only mode itself, not primary-only mode.
> AssertionError in local continuous query during rollback update counter skip
> ----------------------------------------------------------------------------
>
> Key: IGNITE-28750
> URL: https://issues.apache.org/jira/browse/IGNITE-28750
> Project: Ignite
> Issue Type: Task
> Reporter: Oleg Valuyskiy
> Assignee: Oleg Valuyskiy
> Priority: Major
> Labels: ise
> Time Spent: 10m
> Remaining Estimate: 0h
>
> A local Continuous Query created with *ContinuousQuery#setLocal(true)* may
> fail with *AssertionError* during transactional rollback after a node failure.
> During rollback, *IgniteTxHandler#applyPartitionsUpdatesCounters* logs
> rollback ranges and notifies the Continuous Query subsystem about skipped
> update counters:
>
> {code:java}
> ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr,
> topVer, rollbackOnPrimary);{code}
>
> This call enters {*}CacheContinuousQueryManager#skipUpdateCounter{*}:
>
> {code:java}
> @Nullable public CounterSkipContext skipUpdateCounter(
> @Nullable CounterSkipContext skipCtx,
> int part,
> long cntr,
> AffinityTopologyVersion topVer,
> boolean primary
> ) {
> for (CacheContinuousQueryListener lsnr : lsnrs.values())
> skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer,
> primary);
> return skipCtx;
> }
> {code}
>
> Currently this method invokes *skipUpdateCounter* on every Continuous Query
> listener registered for the cache. There is no filtering by local-only mode,
> primary-only mode, or listener type.
> This is problematic for local Continuous Queries: for such queries the
> *CacheContinuousQueryHandler* is registered only on the node where the query
> was started. In this case the handler has the following state:
>
> {code:java}
> loc == true
> locOnly == true{code}
>
> where *loc* means that the current node is the node that initiated the
> Continuous Query:
>
> {code:java}
> final boolean loc = nodeId.equals(ctx.localNodeId());{code}
>
> and *locOnly* means that the query was created with
> {*}ContinuousQuery#setLocal(true){*}.
> This combination is valid for a local Continuous Query. However,
> *CacheContinuousQueryHandler.CacheContinuousQueryListener#skipUpdateCounter*
> currently rejects this state:
>
> {code:java}
> if (loc) {
> assert !locOnly;
> final Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
> handleEvent(ctx, skipCtx.entry());
> ...
> }
> {code}
>
> As a result, when transaction rollback counter cleanup calls
> *skipUpdateCounter* for a cache that has a local CQ listener, the local-only
> listener may reach this unsupported path and fail with {*}AssertionError{*}.
> The important point is that skipped update counters are part of the
> distributed Continuous Query recovery/order mechanism. For regular
> distributed Continuous Queries, skipped counters are needed to close gaps in
> per-partition update counter sequences. A skipped counter is represented as a
> synthetic *CacheContinuousQueryEntry* marked as filtered:
>
> {code:java}
> entry = new CacheContinuousQueryEntry(..., part, cntr, topVer, ...);
> entry.markFiltered();{code}
>
> This synthetic entry is passed through the same recovery path as ordinary CQ
> entries. It allows *CacheContinuousQueryPartitionRecovery* to advance the
> counter sequence and release pending events that were waiting for a missing
> counter.
> For example:
>
> {code:java}
> counter 10 -> real event
> counter 11 -> rolled back / filtered / skipped
> counter 12 -> real event{code}
>
> Without a skip marker for counter 11, the recovery logic may not be able to
> safely release counter 12. Therefore, distributed CQ handlers process skipped
> counters through *handleEvent(...)* / partition recovery.
> For local-only Continuous Queries, regular update events *are NOT* delivered
> through the distributed partition recovery path. In
> {*}CacheContinuousQueryHandler#onEntryUpdated{*}, local-only events are
> delivered directly to the local listener:
>
> {code:java}
> if (loc) {
> if (!locOnly) {
> Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
> handleEvent(ctx, entry);
> notifyLocalListener(evts, trans);
> ...
> }
> else if (!entry.isFiltered())
> notifyLocalListener(F.asList(evt), trans);
> }
> {code}
>
> So for locOnly == true, regular user events do not go through
> {*}handleEvent(...){*}, {*}CacheContinuousQueryPartitionRecovery{*}, or the
> skipped-counter recovery mechanism. They are delivered directly to the local
> listener.
> Because of that, local-only CQ listeners should not participate in
> *skipUpdateCounter* processing. A skipped counter is not a user-visible
> event, and for local-only CQ there is no distributed pending-event chain that
> needs to be advanced by a synthetic filtered entry.
> The current implementation violates this invariant because
> CacheContinuousQueryManager#skipUpdateCounter blindly calls every registered
> listener, including local-only listeners.
> The fix is to explicitly identify local-only CQ listeners and skip them
> during skipped-counter processing.
> It is important NOT to use *CacheContinuousQueryListener#isPrimaryOnly()* for
> this filtering. *isPrimaryOnly()* is not equivalent to local-only. It is
> derived as:
> {code:java}
> return locOnly && !skipPrimaryCheck;{code}
> For PARTITIONED caches with local CQ:
> {code:java}
> locOnly == true
> skipPrimaryCheck == false
> isPrimaryOnly() == true{code}
> So *isPrimaryOnly()* would accidentally work for this case.
> However, for REPLICATED caches local CQ uses {*}skipPrimaryCheck{*}:
> {code:java}
> boolean skipPrimaryCheck =
> loc &&
> cctx.config().getCacheMode() == CacheMode.REPLICATED &&
> cctx.affinityNode();{code}
> In this case:
> {code:java}
> locOnly == true
> skipPrimaryCheck == true
> isPrimaryOnly() == false{code}
> The query is still local-only, but it is not primary-only. Therefore
> filtering by *isPrimaryOnly()* would not exclude local CQ listeners on
> replicated caches and the same invalid *loc == true && locOnly == true*
> *skipUpdateCounter* path could still be reached.
> The proper condition is local-only mode itself, not primary-only mode.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)