This is an automated email from the ASF dual-hosted git repository. namelchev pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push: new ee9488de IGNITE-21285 Data entries looped in CDC if ExpirePolicy is set (#255) ee9488de is described below commit ee9488de0bd0b0c5e221f0d06b2db3811ddd0b5b Author: Nikita Amelchev <nsamelc...@gmail.com> AuthorDate: Wed Feb 21 13:33:03 2024 +0300 IGNITE-21285 Data entries looped in CDC if ExpirePolicy is set (#255) --- .../CacheVersionConflictResolverImpl.java | 38 ++++++++-------------- .../apache/ignite/cdc/AbstractReplicationTest.java | 38 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java index ce1a17fc..85315e28 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictR import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -89,30 +88,9 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes ) { GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); - boolean expireExists = oldEntry.ttl() != CU.TTL_ETERNAL - || newEntry.ttl() != CU.TTL_ETERNAL - || oldEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL - || newEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL; - boolean useNew = isUseNew(ctx, oldEntry, newEntry); - if (expireExists) { - if (newEntry.expireTime() > oldEntry.expireTime()) { - res.merge( - useNew ? newEntry.value(ctx) : oldEntry.value(ctx), - newEntry.ttl(), - newEntry.expireTime() - ); - } - else { - res.merge( - useNew ? newEntry.value(ctx) : oldEntry.value(ctx), - oldEntry.ttl(), - oldEntry.expireTime() - ); - } - } - else if (useNew) + if (useNew) res.useNew(); else res.useOld(); @@ -140,8 +118,18 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes if (oldEntry.isStartVersion()) // Entry absent (new entry). return true; - if (oldEntry.dataCenterId() == newEntry.dataCenterId()) - return newEntry.version().compareTo(oldEntry.version()) > 0; // New version from the same cluster. + if (oldEntry.dataCenterId() == newEntry.dataCenterId()) { + int cmp = newEntry.version().compareTo(oldEntry.version()); + + // Ignite sets the expire time to zero on backups for transaction caches. + // If CDC is running in onlyPrimary=false mode, then the updates from backups may be applied first. + // In this case, a new entry from the primary node should be used to set the expiration time. + // See GridDistributedTxRemoteAdapter#commitIfLocked + if (cmp == 0) + return newEntry.expireTime() > oldEntry.expireTime(); + + return cmp > 0; // New version from the same cluster. + } if (conflictResolveFieldEnabled) { Object oldVal = oldEntry.value(ctx); diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java index 84ccd3c8..6b7b2788 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java @@ -35,6 +35,7 @@ import javax.cache.expiry.Duration; import javax.cache.expiry.ExpiryPolicy; import javax.management.DynamicMBean; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; @@ -49,11 +50,18 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -258,6 +266,8 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { + checkNoLocalUpdatesOnPassiveCluster(); + stopAllGrids(); cleanPersistenceDir(); @@ -573,6 +583,34 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest { return node.context().query().querySqlFields(new SqlFieldsQuery(sqlText).setArgs(args), true).getAll(); } + /** */ + private void checkNoLocalUpdatesOnPassiveCluster() throws IgniteCheckedException { + if (srcCluster[0].cache(ACTIVE_PASSIVE_CACHE) == null) + return; + + assertTrue(hasLocalUpdates(srcCluster)); + assertFalse(hasLocalUpdates(destCluster)); + } + + /** @return {@code True} if cluster has local updates. */ + private boolean hasLocalUpdates(IgniteEx[] cluster) throws IgniteCheckedException { + for (IgniteEx srv : cluster) { + WALIterator iter = srv.context().cache().context().wal().replay(null, + (type, ptr) -> type == WALRecord.RecordType.DATA_RECORD_V2); + + for (IgniteBiTuple<WALPointer, WALRecord> t : iter) { + Collection<DataEntry> locUpdates = F.view(((DataRecord)t.get2()).writeEntries(), + e -> e.cacheId() == CU.cacheId(ACTIVE_PASSIVE_CACHE), + e -> !(e.writeVersion() instanceof GridCacheVersionEx)); + + if (!locUpdates.isEmpty()) + return true; + } + } + + return false; + } + /** @return Destination cluster host addresses. */ protected String[] hostAddresses(IgniteEx[] dest) { String[] addrs = new String[dest.length];