This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 67dd34efe32 IGNITE-22299 Drop GridCacheVersion#otherClusterVersion for 
cache resending (#11360)
67dd34efe32 is described below

commit 67dd34efe329ed11cb252f0f9d3a5598287b818d
Author: Maksim Timonin <timoninma...@apache.org>
AuthorDate: Mon May 27 22:07:50 2024 +0300

    IGNITE-22299 Drop GridCacheVersion#otherClusterVersion for cache resending 
(#11360)
---
 .../apache/ignite/util/CdcResendCommandTest.java   | 182 +++++++++++++++++++++
 .../management/cdc/CdcCacheDataResendTask.java     |  10 +-
 2 files changed, 191 insertions(+), 1 deletion(-)

diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
index 81cb9864e6c..c333fd3ed0d 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java
@@ -17,12 +17,41 @@
 
 package org.apache.ignite.util;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cdc.AbstractCdcTest;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cdc.CdcMain;
+import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
 import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
@@ -53,9 +82,27 @@ public class CdcResendCommandTest extends 
GridCommandHandlerAbstractTest {
         cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
             .setBackups(1));
 
+        cfg.setPluginProviders(new AbstractTestPluginProvider() {
+            @Override public String name() {
+                return "ConflictResolverProvider";
+            }
+
+            @Override public <T> T createComponent(PluginContext ctx, Class<T> 
cls) {
+                if (cls != CacheConflictResolutionManager.class)
+                    return null;
+
+                return (T)new AlwaysNewResolutionManager<>();
+            }
+        });
+
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
@@ -94,4 +141,139 @@ public class CdcResendCommandTest extends 
GridCommandHandlerAbstractTest {
 
         assertEquals(KEYS_CNT, ign.cache(DEFAULT_CACHE_NAME).size());
     }
+
+    /** */
+    @Test
+    public void testResendConflictVersion() throws Exception {
+        IgniteEx ign = startGrid(0);
+
+        ign.cluster().state(ACTIVE);
+
+        enableCheckpoints(ign, false);
+
+        IgniteInternalCache<Integer, Integer> cachex = 
ign.cachex(DEFAULT_CACHE_NAME);
+
+        // Put data.
+        cachex.put(0, 0);
+
+        // Override data from clusterId=2.
+        KeyCacheObject key = new KeyCacheObjectImpl(0, null, 
cachex.affinity().partition(0));
+        CacheObject val = new CacheObjectImpl(1, null);
+        val.prepareMarshal(cachex.context().cacheObjectContext());
+
+        GridCacheVersion conflict = new GridCacheVersion(1, 0, 1, (byte)2);
+
+        Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>();
+        drMap.put(key, new GridCacheDrInfo(val, conflict));
+
+        cachex.putAllConflict(drMap);
+
+        // Resend data.
+        executeCommand(EXIT_CODE_OK, CDC, RESEND, CACHES, DEFAULT_CACHE_NAME);
+
+        TestCdcConsumer cnsmr = new TestCdcConsumer();
+
+        CdcConfiguration cfg = new CdcConfiguration();
+        cfg.setConsumer(cnsmr);
+
+        CdcMain cdc = new CdcMain(ign.configuration(), null, cfg);
+        GridTestUtils.runAsync(cdc);
+
+        assertTrue(GridTestUtils.waitForCondition(() -> cnsmr.events().size() 
== 3, 10_000, 100));
+
+        CdcEvent ev0 = cnsmr.events().get(0);
+        assertEquals(0, ev0.key());
+        assertEquals(0, ev0.value());
+        assertNull(ev0.version().otherClusterVersion());
+
+        CdcEvent ev1 = cnsmr.events().get(1);
+        assertEquals(0, ev1.key());
+        assertEquals(1, ev1.value());
+        assertEquals(conflict, ev1.version().otherClusterVersion());
+
+        CdcEvent ev2 = cnsmr.events().get(2);
+        assertEquals(0, ev2.key());
+        assertEquals(1, ev2.value());
+        assertNull(ev2.version().otherClusterVersion());
+    }
+
+    /** */
+    private static class TestCdcConsumer implements CdcConsumer {
+        /** */
+        private final List<CdcEvent> events = new ArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override public boolean onEvents(Iterator<CdcEvent> events) {
+            synchronized (this) {
+                events.forEachRemaining(this.events::add);
+            }
+
+            return false;
+        }
+
+        /** */
+        synchronized List<CdcEvent> events() {
+            return events;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start(MetricRegistry mreg) {
+            // No-op
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTypes(Iterator<BinaryType> types) {
+            types.forEachRemaining(t -> {});
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMappings(Iterator<TypeMapping> mappings) {
+            mappings.forEachRemaining(t -> {});
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheChange(Iterator<CdcCacheEvent> 
cacheEvents) {
+            cacheEvents.forEachRemaining(t -> {});
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheDestroy(Iterator<Integer> caches) {
+            // No-op
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop() {
+            // No-op
+        }
+    }
+
+    /** */
+    private static class AlwaysNewResolutionManager<K, V>
+        extends GridCacheManagerAdapter<K, V> implements 
CacheConflictResolutionManager<K, V> {
+        /** */
+        private final CacheVersionConflictResolver rslv;
+
+        /** */
+        AlwaysNewResolutionManager() {
+            rslv = new CacheVersionConflictResolver() {
+                @Override public <K1, V1> GridCacheVersionConflictContext<K1, 
V1> resolve(
+                    CacheObjectValueContext ctx,
+                    GridCacheVersionedEntryEx<K1, V1> oldEntry,
+                    GridCacheVersionedEntryEx<K1, V1> newEntry,
+                    boolean atomicVerComparator
+                ) {
+                    GridCacheVersionConflictContext<K1, V1> res = new 
GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
+
+                    res.useNew();
+
+                    return res;
+                }
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheVersionConflictResolver conflictResolver() {
+            return rslv;
+        }
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java
index 9e206493bc1..c38a94df8cd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java
@@ -39,6 +39,8 @@ import 
org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.typedef.F;
@@ -195,13 +197,19 @@ public class CdcCacheDataResendTask extends 
VisorMultiNodeTask<CdcResendCommandA
                 if (log.isTraceEnabled())
                     log.trace("Resend key: " + key);
 
+                GridCacheVersion ver = row.version();
+
+                // Entries must not hold otherClusterVersion to be inserted 
into a receiver cluster.
+                if (ver instanceof GridCacheVersionEx)
+                    ver = new GridCacheVersion(ver.topologyVersion(), 
ver.order(), ver.nodeOrder(), ver.clusterId());
+
                 CdcDataRecord rec = new CdcDataRecord(new DataEntry(
                     cctx.cacheId(),
                     key,
                     row.value(),
                     GridCacheOperation.CREATE,
                     null,
-                    row.version(),
+                    ver,
                     row.expireTime(),
                     key.partition(),
                     -1,

Reply via email to