This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch ignite-2.16
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 0a200d5006aa5069442028aa27737a4a5fd346ec
Author: Nikolay <nizhi...@apache.org>
AuthorDate: Mon Nov 27 14:29:50 2023 +0300

    IGNITE-20917 Add entry version to DumpEntry (#11069)
    
    (cherry picked from commit b7a8f021f8caf22ad51977be0323dc4085d095e3)
---
 .../java/org/apache/ignite/dump/DumpEntry.java     |   9 +-
 .../snapshot/SnapshotPartitionsVerifyHandler.java  |   2 +-
 .../snapshot/dump/CreateDumpFutureTask.java        |  11 +-
 .../snapshot/dump/DumpEntrySerializer.java         |  63 +++++++-
 .../processors/cache/verify/IdleVerifyUtility.java |   4 +-
 .../snapshot/dump/AbstractCacheDumpTest.java       |   4 +
 .../snapshot/dump/IgniteCacheDumpSelf2Test.java    | 170 ++++++++++++++++++++-
 7 files changed, 247 insertions(+), 16 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java 
b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
index ca34f3cbace..6fa8b3c58a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.dump;
 
+import java.util.Collection;
 import java.util.Iterator;
+import org.apache.ignite.cache.CacheEntryVersion;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump;
 import org.apache.ignite.lang.IgniteExperimental;
 
@@ -26,7 +28,7 @@ import org.apache.ignite.lang.IgniteExperimental;
  *
  * @see Dump#iterator(String, int, int)
  * @see DumpConsumer#onPartition(int, int, Iterator)
- * @see org.apache.ignite.IgniteSnapshot#createDump(String)
+ * @see org.apache.ignite.IgniteSnapshot#createDump(String, Collection)
  */
 @IgniteExperimental
 public interface DumpEntry {
@@ -36,6 +38,11 @@ public interface DumpEntry {
     /** @return Expiration time. */
     public long expireTime();
 
+    /**
+     * @return Version of the entry.
+     */
+    public CacheEntryVersion version();
+
     /** @return Key. */
     public Object key();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 3847b19b3a9..71cafd15828 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -409,7 +409,7 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
                 while (iter.hasNext()) {
                     DumpEntry e = iter.next();
 
-                    ctx.update((KeyCacheObject)e.key(), 
(CacheObject)e.value(), null);
+                    ctx.update((KeyCacheObject)e.key(), 
(CacheObject)e.value(), e.version());
 
                     size++;
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index 11314e9d5e5..7e80a8dc64d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -488,7 +488,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                     else if (val == null)
                         reasonToSkip = "newly created or already removed"; // 
Previous value is null. Entry created after dump start, skip.
                     else {
-                        write(cache, expireTime, key, val);
+                        write(cache, expireTime, key, val, ver);
 
                         changedCnt.increment();
                     }
@@ -532,7 +532,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
             else if (changed.get(cache).contains(key))
                 written = false;
             else
-                write(cache, expireTime, key, val);
+                write(cache, expireTime, key, val, ver);
 
             if (log.isTraceEnabled()) {
                 log.trace("Iterator [" +
@@ -540,17 +540,18 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                     ", cache=" + cache +
                     ", part=" + part +
                     ", key=" + key +
-                    ", written=" + written + ']');
+                    ", written=" + written +
+                    ", ver=" + ver + ']');
             }
 
             return written;
         }
 
         /** */
-        private void write(int cache, long expireTime, KeyCacheObject key, 
CacheObject val) {
+        private void write(int cache, long expireTime, KeyCacheObject key, 
CacheObject val, GridCacheVersion ver) {
             synchronized (serializer) { // Prevent concurrent access to the 
dump file.
                 try {
-                    ByteBuffer buf = serializer.writeToBuffer(cache, 
expireTime, key, val, cctx.cacheObjectContext(cache));
+                    ByteBuffer buf = serializer.writeToBuffer(cache, 
expireTime, key, val, ver, cctx.cacheObjectContext(cache));
 
                     if (file.writeFully(buf) != buf.limit())
                         throw new IgniteException("Can't write row");
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
index 27146ec3644..4a1fd7f1c29 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java
@@ -24,6 +24,7 @@ import java.util.function.Function;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.dump.DumpEntry;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
@@ -32,6 +33,8 @@ import 
org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 
 /**
@@ -88,11 +91,11 @@ public class DumpEntrySerializer {
     /**
      * Dump entry structure:
      * <pre>
-     * +---------+-----------+----------+-----------------+-----+-------+
-     * | 4 bytes | 4 bytes   | 4 bytes  | 8 bytes         |     |       |
-     * +---------+-----------+----------+-----------------+-----+-------+
-     * | CRC     | Data size | cache ID | expiration time | key | value |
-     * +---------+-----------+----------+-----------------+-----+-------+
+     * +---------+-----------+----------+-----------------+-----+-------------+
+     * | 4 bytes | 4 bytes   | 4 bytes  | 8 bytes         |     |     |       |
+     * +---------+-----------+----------+-----------------+-----+-------------+
+     * | CRC     | Data size | cache ID | expiration time | ver | key | value |
+     * +---------+-----------+----------+-----------------+-----+-------------+
      * </pre>
      *
      * @param cache Cache id.
@@ -108,11 +111,22 @@ public class DumpEntrySerializer {
         long expireTime,
         KeyCacheObject key,
         CacheObject val,
+        GridCacheVersion ver,
         CacheObjectContext coCtx
     ) throws IgniteCheckedException {
+        int verSz = Integer.BYTES/*topVer*/ + Long.BYTES/*order*/ + 
Integer.BYTES/*nodeOrderDrId*/;
+
+        boolean hasConflictVer = ver.otherClusterVersion() != null;
+
+        if (hasConflictVer)
+            verSz *= 2 /*GridCacheVersion otherClusterVersion*/;
+
+        assert ver.fieldsCount() == (hasConflictVer ? 4 : 3);
+
         int keySz = key.valueBytesLength(coCtx);
         int valSz = val.valueBytesLength(coCtx);
-        int dataSz = /*cache ID*/Integer.BYTES + /*expire time*/Long.BYTES + 
/*key*/keySz + /*value*/valSz;
+        int dataSz = /*cache ID*/Integer.BYTES + /*expire time*/Long.BYTES
+            + /* hasConflictVersion */1 + /*version*/verSz + /*key*/keySz + 
/*value*/valSz;
 
         int fullSz = dataSz + /*extra bytes for row size*/Integer.BYTES + 
/*CRC*/Integer.BYTES;
 
@@ -128,6 +142,19 @@ public class DumpEntrySerializer {
         buf.putInt(cache);
         buf.putLong(expireTime);
 
+        buf.put((byte)(hasConflictVer ? 1 : 0));
+        buf.putInt(ver.topologyVersion());
+        buf.putLong(ver.order());
+        buf.putInt(ver.nodeOrderAndDrIdRaw());
+
+        if (hasConflictVer) {
+            GridCacheVersion ver0 = 
(GridCacheVersion)ver.otherClusterVersion();
+
+            buf.putInt(ver0.topologyVersion());
+            buf.putLong(ver0.order());
+            buf.putInt(ver0.nodeOrderAndDrIdRaw());
+        }
+
         if (!key.putValue(buf))
             throw new IgniteCheckedException("Can't write key");
 
@@ -195,6 +222,8 @@ public class DumpEntrySerializer {
         int cache = buf.getInt();
         long expireTime = buf.getLong();
 
+        GridCacheVersion ver = readVersion(buf);
+
         int keySz = buf.getInt();
 
         byte keyType = buf.get();
@@ -225,6 +254,10 @@ public class DumpEntrySerializer {
                 return expireTime;
             }
 
+            @Override public CacheEntryVersion version() {
+                return ver;
+            }
+
             @Override public Object key() {
                 return raw ? key : UnwrapDataEntry.unwrapKey(key, keepBinary, 
fakeCacheObjCtx);
             }
@@ -235,6 +268,24 @@ public class DumpEntrySerializer {
         };
     }
 
+    /** @return Written entry version. */
+    private static GridCacheVersion readVersion(ByteBuffer buf) {
+        boolean hasConflictVer = buf.get() == 1;
+
+        int topVer = buf.getInt();
+        long order = buf.getLong();
+        int nodeOrderDrId = buf.getInt();
+
+        if (!hasConflictVer)
+            return new GridCacheVersion(topVer, nodeOrderDrId, order);
+
+        int topVer0 = buf.getInt();
+        long order0 = buf.getLong();
+        int nodeOrderDrId0 = buf.getInt();
+
+        return new GridCacheVersionEx(topVer, nodeOrderDrId, order, new 
GridCacheVersion(topVer0, nodeOrderDrId0, order0));
+    }
+
     /** @return Thread local buffer. */
     private ByteBuffer threadLocalBuffer() {
         return thLocBufs.computeIfAbsent(Thread.currentThread().getId(), 
DFLT_BUF_ALLOC);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index 290dc0352e6..e2d22e61a0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.function.BiConsumer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.binary.BinaryObjectEx;
 import org.apache.ignite.internal.management.cache.PartitionKeyV2;
@@ -46,7 +47,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerificationTask.HashHolder;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
 import org.apache.ignite.internal.util.typedef.F;
@@ -392,7 +392,7 @@ public class IdleVerifyUtility {
         public void update(
             KeyCacheObject key,
             CacheObject val,
-            @Nullable GridCacheVersion ver
+            CacheEntryVersion ver
         ) throws IgniteCheckedException {
             partHash += key.hashCode();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
index fdb5207b4a6..a61a6c8b2d4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java
@@ -386,6 +386,8 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
                         DumpEntry e = iter.next();
 
                         assertNotNull(e);
+                        assertNotNull(e.version());
+                        assertNull(e.version().otherClusterVersion());
 
                         if (e.cacheId() == CU.cacheId(CACHE_0))
                             assertEquals(USER_FACTORY.apply((Integer)e.key()), 
e.value());
@@ -430,6 +432,8 @@ public abstract class AbstractCacheDumpTest extends 
GridCommonAbstractTest {
         Integer key = (Integer)e.key();
 
         assertEquals(key, e.value());
+        assertNotNull(e.version());
+        assertNull(e.version().otherClusterVersion());
     }
 
     /** */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
index 429db9f2bd4..ff3b959073b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java
@@ -24,38 +24,67 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpEntry;
+import org.apache.ignite.dump.DumpReader;
+import org.apache.ignite.dump.DumpReaderConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.TestDumpConsumer;
+import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.platform.model.User;
+import org.apache.ignite.plugin.AbstractCachePluginProvider;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY;
+import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_THREAD_CNT;
+import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_TIMEOUT;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
@@ -305,13 +334,41 @@ public class IgniteCacheDumpSelf2Test extends 
GridCommonAbstractTest {
 
         ign.snapshot().createDump(DMP_NAME, null).get();
 
+        String out = invokeCheckCommand(ign, DMP_NAME);
+
         assertContains(
             null,
-            invokeCheckCommand(ign, DMP_NAME),
+            out,
             "Conflict partition: PartitionKeyV2 [grpId=" + 
CU.cacheId(DEFAULT_CACHE_NAME) +
                 ", grpName=" + DEFAULT_CACHE_NAME +
                 ", partId=" + corruptedPart + "]"
         );
+
+        String verPattern = "partVerHash=(-)?[0-9]+";
+        String hashPattern = "partHash=(-)?[0-9]+";
+
+        Matcher m = Pattern.compile(verPattern).matcher(out);
+
+        assertTrue(m.find());
+        String ver0 = out.substring(m.start(), m.end());
+
+        assertTrue(m.find());
+        String ver1 = out.substring(m.start(), m.end());
+
+        assertFalse(m.find());
+
+        m = Pattern.compile(hashPattern).matcher(out);
+
+        assertTrue(m.find());
+        String hash0 = out.substring(m.start(), m.end());
+
+        assertTrue(m.find());
+        String hash1 = out.substring(m.start(), m.end());
+
+        assertFalse(m.find());
+
+        assertFalse(Objects.equals(ver0, ver1));
+        assertFalse(Objects.equals(hash0, hash1));
     }
 
     /** */
@@ -465,4 +522,115 @@ public class IgniteCacheDumpSelf2Test extends 
GridCommonAbstractTest {
             )
         );
     }
+
+    /** */
+    @Test
+    public void testDumpEntryConflictVersion() throws Exception {
+        IgniteConfiguration cfg = 
getConfiguration("test").setPluginProviders(new AbstractTestPluginProvider() {
+            @Override public String name() {
+                return "ConflictResolverProvider";
+            }
+
+            @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+                if 
(!ctx.igniteCacheConfiguration().getName().equals(DEFAULT_CACHE_NAME))
+                    return null;
+
+                return new AbstractCachePluginProvider() {
+                    @Override public @Nullable Object createComponent(Class 
cls) {
+                        if (cls != CacheConflictResolutionManager.class)
+                            return null;
+
+                        return new TestCacheConflictResolutionManager();
+                    }
+                };
+            }
+        });
+
+        IgniteEx ign = startGrid(cfg);
+
+        IgniteCache<Integer, Integer> cache = ign.createCache(new 
CacheConfiguration<Integer, Integer>()
+            .setName(DEFAULT_CACHE_NAME)
+            .setAffinity(new RendezvousAffinityFunction().setPartitions(3))
+        );
+
+        int topVer = 42;
+        int dataCenterId = 31;
+        int nodeOrder = 13;
+
+        Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>();
+
+        IgniteInternalCache<Integer, Integer> intCache = 
ign.cachex(cache.getName());
+
+        for (int i = 0; i < KEYS_CNT; i++) {
+            KeyCacheObject key = new KeyCacheObjectImpl(i, null, 
intCache.affinity().partition(i));
+            CacheObject val = new CacheObjectImpl(i, null);
+
+            val.prepareMarshal(intCache.context().cacheObjectContext());
+
+            drMap.put(key, new GridCacheDrInfo(val, new 
GridCacheVersion(topVer, i, nodeOrder, dataCenterId)));
+        }
+
+        intCache.putAllConflict(drMap);
+
+        ign.snapshot().createDump(DMP_NAME, null).get(getTestTimeout());
+
+        TestDumpConsumer cnsmr = new TestDumpConsumer() {
+            @Override public void onPartition(int grp, int part, 
Iterator<DumpEntry> data) {
+                data.forEachRemaining(e -> {
+                    int key = (int)e.key();
+                    int val = (int)e.value();
+
+                    assertNotNull(e.version());
+
+                    CacheEntryVersion conflictVer = 
e.version().otherClusterVersion();
+
+                    assertNotNull(conflictVer);
+                    assertEquals(topVer, conflictVer.topologyVersion());
+                    assertEquals(nodeOrder, conflictVer.nodeOrder());
+                    assertEquals(dataCenterId, conflictVer.clusterId());
+                    assertEquals(key, val);
+                    assertEquals(key, conflictVer.order());
+                });
+            }
+        };
+
+        new DumpReader(
+            new DumpReaderConfiguration(
+                dumpDirectory(ign, DMP_NAME),
+                cnsmr,
+                DFLT_THREAD_CNT,
+                DFLT_TIMEOUT,
+                true,
+                false,
+                null,
+                false
+            ),
+            log
+        ).run();
+
+        cnsmr.check();
+    }
+
+    /** */
+    public class TestCacheConflictResolutionManager<K, V> extends 
GridCacheManagerAdapter<K, V>
+        implements CacheConflictResolutionManager<K, V> {
+
+        /** {@inheritDoc} */
+        @Override public CacheVersionConflictResolver conflictResolver() {
+            return new CacheVersionConflictResolver() {
+                @Override public <K, V> GridCacheVersionConflictContext<K, V> 
resolve(
+                    CacheObjectValueContext ctx,
+                    GridCacheVersionedEntryEx<K, V> oldEntry,
+                    GridCacheVersionedEntryEx<K, V> newEntry,
+                    boolean atomicVerComparator
+                ) {
+                    GridCacheVersionConflictContext res = new 
GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
+
+                    res.useNew();
+
+                    return res;
+                }
+            };
+        }
+    }
 }

Reply via email to