This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new ee9488de IGNITE-21285 Data entries looped in CDC if ExpirePolicy is 
set (#255)
ee9488de is described below

commit ee9488de0bd0b0c5e221f0d06b2db3811ddd0b5b
Author: Nikita Amelchev <nsamelc...@gmail.com>
AuthorDate: Wed Feb 21 13:33:03 2024 +0300

    IGNITE-21285 Data entries looped in CDC if ExpirePolicy is set (#255)
---
 .../CacheVersionConflictResolverImpl.java          | 38 ++++++++--------------
 .../apache/ignite/cdc/AbstractReplicationTest.java | 38 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 25 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
index ce1a17fc..85315e28 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
@@ -24,7 +24,6 @@ import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictR
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -89,30 +88,9 @@ public class CacheVersionConflictResolverImpl implements 
CacheVersionConflictRes
     ) {
         GridCacheVersionConflictContext<K, V> res = new 
GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
 
-        boolean expireExists = oldEntry.ttl() != CU.TTL_ETERNAL
-            || newEntry.ttl() != CU.TTL_ETERNAL
-            || oldEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL
-            || newEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL;
-
         boolean useNew = isUseNew(ctx, oldEntry, newEntry);
 
-        if (expireExists) {
-            if (newEntry.expireTime() > oldEntry.expireTime()) {
-                res.merge(
-                    useNew ? newEntry.value(ctx) : oldEntry.value(ctx),
-                    newEntry.ttl(),
-                    newEntry.expireTime()
-                );
-            }
-            else {
-                res.merge(
-                    useNew ? newEntry.value(ctx) : oldEntry.value(ctx),
-                    oldEntry.ttl(),
-                    oldEntry.expireTime()
-                );
-            }
-        }
-        else if (useNew)
+        if (useNew)
             res.useNew();
         else
             res.useOld();
@@ -140,8 +118,18 @@ public class CacheVersionConflictResolverImpl implements 
CacheVersionConflictRes
         if (oldEntry.isStartVersion()) // Entry absent (new entry).
             return true;
 
-        if (oldEntry.dataCenterId() == newEntry.dataCenterId())
-            return newEntry.version().compareTo(oldEntry.version()) > 0; // 
New version from the same cluster.
+        if (oldEntry.dataCenterId() == newEntry.dataCenterId()) {
+            int cmp = newEntry.version().compareTo(oldEntry.version());
+
+            // Ignite sets the expire time to zero on backups for transaction 
caches.
+            // If CDC is running in onlyPrimary=false mode, then the updates 
from backups may be applied first.
+            // In this case, a new entry from the primary node should be used 
to set the expiration time.
+            // See GridDistributedTxRemoteAdapter#commitIfLocked
+            if (cmp == 0)
+                return newEntry.expireTime() > oldEntry.expireTime();
+
+            return cmp > 0; // New version from the same cluster.
+        }
 
         if (conflictResolveFieldEnabled) {
             Object oldVal = oldEntry.value(ctx);
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 84ccd3c8..6b7b2788 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -35,6 +35,7 @@ import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.management.DynamicMBean;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
@@ -49,11 +50,18 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -258,6 +266,8 @@ public abstract class AbstractReplicationTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        checkNoLocalUpdatesOnPassiveCluster();
+
         stopAllGrids();
 
         cleanPersistenceDir();
@@ -573,6 +583,34 @@ public abstract class AbstractReplicationTest extends 
GridCommonAbstractTest {
         return node.context().query().querySqlFields(new 
SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
     }
 
+    /** */
+    private void checkNoLocalUpdatesOnPassiveCluster() throws 
IgniteCheckedException {
+        if (srcCluster[0].cache(ACTIVE_PASSIVE_CACHE) == null)
+            return;
+
+        assertTrue(hasLocalUpdates(srcCluster));
+        assertFalse(hasLocalUpdates(destCluster));
+    }
+
+    /** @return {@code True} if cluster has local updates. */
+    private boolean hasLocalUpdates(IgniteEx[] cluster) throws 
IgniteCheckedException {
+        for (IgniteEx srv : cluster) {
+            WALIterator iter = 
srv.context().cache().context().wal().replay(null,
+                (type, ptr) -> type == WALRecord.RecordType.DATA_RECORD_V2);
+
+            for (IgniteBiTuple<WALPointer, WALRecord> t : iter) {
+                Collection<DataEntry> locUpdates = 
F.view(((DataRecord)t.get2()).writeEntries(),
+                    e -> e.cacheId() == CU.cacheId(ACTIVE_PASSIVE_CACHE),
+                    e -> !(e.writeVersion() instanceof GridCacheVersionEx));
+
+                if (!locUpdates.isEmpty())
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
     /** @return Destination cluster host addresses. */
     protected String[] hostAddresses(IgniteEx[] dest) {
         String[] addrs = new String[dest.length];

Reply via email to