This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 67dd34efe32 IGNITE-22299 Drop GridCacheVersion#otherClusterVersion for cache resending (#11360) 67dd34efe32 is described below commit 67dd34efe329ed11cb252f0f9d3a5598287b818d Author: Maksim Timonin <timoninma...@apache.org> AuthorDate: Mon May 27 22:07:50 2024 +0300 IGNITE-22299 Drop GridCacheVersion#otherClusterVersion for cache resending (#11360) --- .../apache/ignite/util/CdcResendCommandTest.java | 182 +++++++++++++++++++++ .../management/cdc/CdcCacheDataResendTask.java | 10 +- 2 files changed, 191 insertions(+), 1 deletion(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java index 81cb9864e6c..c333fd3ed0d 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java @@ -17,12 +17,41 @@ package org.apache.ignite.util; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cdc.AbstractCdcTest; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcConfiguration; +import org.apache.ignite.cdc.CdcConsumer; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.cdc.TypeMapping; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; +import org.apache.ignite.metric.MetricRegistry; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT; @@ -53,9 +82,27 @@ public class CdcResendCommandTest extends GridCommandHandlerAbstractTest { cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) .setBackups(1)); + cfg.setPluginProviders(new AbstractTestPluginProvider() { + @Override public String name() { + return "ConflictResolverProvider"; + } + + @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + if (cls != CacheConflictResolutionManager.class) + return null; + + return (T)new AlwaysNewResolutionManager<>(); + } + }); + return cfg; } + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -94,4 +141,139 @@ public class CdcResendCommandTest extends GridCommandHandlerAbstractTest { assertEquals(KEYS_CNT, ign.cache(DEFAULT_CACHE_NAME).size()); } + + /** */ + @Test + public void testResendConflictVersion() throws Exception { + IgniteEx ign = startGrid(0); + + ign.cluster().state(ACTIVE); + + enableCheckpoints(ign, false); + + IgniteInternalCache<Integer, Integer> cachex = ign.cachex(DEFAULT_CACHE_NAME); + + // Put data. + cachex.put(0, 0); + + // Override data from clusterId=2. + KeyCacheObject key = new KeyCacheObjectImpl(0, null, cachex.affinity().partition(0)); + CacheObject val = new CacheObjectImpl(1, null); + val.prepareMarshal(cachex.context().cacheObjectContext()); + + GridCacheVersion conflict = new GridCacheVersion(1, 0, 1, (byte)2); + + Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>(); + drMap.put(key, new GridCacheDrInfo(val, conflict)); + + cachex.putAllConflict(drMap); + + // Resend data. + executeCommand(EXIT_CODE_OK, CDC, RESEND, CACHES, DEFAULT_CACHE_NAME); + + TestCdcConsumer cnsmr = new TestCdcConsumer(); + + CdcConfiguration cfg = new CdcConfiguration(); + cfg.setConsumer(cnsmr); + + CdcMain cdc = new CdcMain(ign.configuration(), null, cfg); + GridTestUtils.runAsync(cdc); + + assertTrue(GridTestUtils.waitForCondition(() -> cnsmr.events().size() == 3, 10_000, 100)); + + CdcEvent ev0 = cnsmr.events().get(0); + assertEquals(0, ev0.key()); + assertEquals(0, ev0.value()); + assertNull(ev0.version().otherClusterVersion()); + + CdcEvent ev1 = cnsmr.events().get(1); + assertEquals(0, ev1.key()); + assertEquals(1, ev1.value()); + assertEquals(conflict, ev1.version().otherClusterVersion()); + + CdcEvent ev2 = cnsmr.events().get(2); + assertEquals(0, ev2.key()); + assertEquals(1, ev2.value()); + assertNull(ev2.version().otherClusterVersion()); + } + + /** */ + private static class TestCdcConsumer implements CdcConsumer { + /** */ + private final List<CdcEvent> events = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public boolean onEvents(Iterator<CdcEvent> events) { + synchronized (this) { + events.forEachRemaining(this.events::add); + } + + return false; + } + + /** */ + synchronized List<CdcEvent> events() { + return events; + } + + /** {@inheritDoc} */ + @Override public void start(MetricRegistry mreg) { + // No-op + } + + /** {@inheritDoc} */ + @Override public void onTypes(Iterator<BinaryType> types) { + types.forEachRemaining(t -> {}); + } + + /** {@inheritDoc} */ + @Override public void onMappings(Iterator<TypeMapping> mappings) { + mappings.forEachRemaining(t -> {}); + } + + /** {@inheritDoc} */ + @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) { + cacheEvents.forEachRemaining(t -> {}); + } + + /** {@inheritDoc} */ + @Override public void onCacheDestroy(Iterator<Integer> caches) { + // No-op + } + + /** {@inheritDoc} */ + @Override public void stop() { + // No-op + } + } + + /** */ + private static class AlwaysNewResolutionManager<K, V> + extends GridCacheManagerAdapter<K, V> implements CacheConflictResolutionManager<K, V> { + /** */ + private final CacheVersionConflictResolver rslv; + + /** */ + AlwaysNewResolutionManager() { + rslv = new CacheVersionConflictResolver() { + @Override public <K1, V1> GridCacheVersionConflictContext<K1, V1> resolve( + CacheObjectValueContext ctx, + GridCacheVersionedEntryEx<K1, V1> oldEntry, + GridCacheVersionedEntryEx<K1, V1> newEntry, + boolean atomicVerComparator + ) { + GridCacheVersionConflictContext<K1, V1> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); + + res.useNew(); + + return res; + } + }; + } + + /** {@inheritDoc} */ + @Override public CacheVersionConflictResolver conflictResolver() { + return rslv; + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java index 9e206493bc1..c38a94df8cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java @@ -39,6 +39,8 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.typedef.F; @@ -195,13 +197,19 @@ public class CdcCacheDataResendTask extends VisorMultiNodeTask<CdcResendCommandA if (log.isTraceEnabled()) log.trace("Resend key: " + key); + GridCacheVersion ver = row.version(); + + // Entries must not hold otherClusterVersion to be inserted into a receiver cluster. + if (ver instanceof GridCacheVersionEx) + ver = new GridCacheVersion(ver.topologyVersion(), ver.order(), ver.nodeOrder(), ver.clusterId()); + CdcDataRecord rec = new CdcDataRecord(new DataEntry( cctx.cacheId(), key, row.value(), GridCacheOperation.CREATE, null, - row.version(), + ver, row.expireTime(), key.partition(), -1,