This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new b7a8f021f8c IGNITE-20917 Add entry version to DumpEntry (#11069) b7a8f021f8c is described below commit b7a8f021f8caf22ad51977be0323dc4085d095e3 Author: Nikolay <nizhi...@apache.org> AuthorDate: Mon Nov 27 14:29:50 2023 +0300 IGNITE-20917 Add entry version to DumpEntry (#11069) --- .../java/org/apache/ignite/dump/DumpEntry.java | 9 +- .../snapshot/SnapshotPartitionsVerifyHandler.java | 2 +- .../snapshot/dump/CreateDumpFutureTask.java | 11 +- .../snapshot/dump/DumpEntrySerializer.java | 63 +++++++- .../processors/cache/verify/IdleVerifyUtility.java | 4 +- .../snapshot/dump/AbstractCacheDumpTest.java | 4 + .../snapshot/dump/IgniteCacheDumpSelf2Test.java | 170 ++++++++++++++++++++- 7 files changed, 247 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java index ca34f3cbace..6fa8b3c58a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/dump/DumpEntry.java @@ -17,7 +17,9 @@ package org.apache.ignite.dump; +import java.util.Collection; import java.util.Iterator; +import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump; import org.apache.ignite.lang.IgniteExperimental; @@ -26,7 +28,7 @@ import org.apache.ignite.lang.IgniteExperimental; * * @see Dump#iterator(String, int, int) * @see DumpConsumer#onPartition(int, int, Iterator) - * @see org.apache.ignite.IgniteSnapshot#createDump(String) + * @see org.apache.ignite.IgniteSnapshot#createDump(String, Collection) */ @IgniteExperimental public interface DumpEntry { @@ -36,6 +38,11 @@ public interface DumpEntry { /** @return Expiration time. */ public long expireTime(); + /** + * @return Version of the entry. + */ + public CacheEntryVersion version(); + /** @return Key. */ public Object key(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java index 3847b19b3a9..71cafd15828 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java @@ -409,7 +409,7 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part while (iter.hasNext()) { DumpEntry e = iter.next(); - ctx.update((KeyCacheObject)e.key(), (CacheObject)e.value(), null); + ctx.update((KeyCacheObject)e.key(), (CacheObject)e.value(), e.version()); size++; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java index 11314e9d5e5..7e80a8dc64d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java @@ -488,7 +488,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple else if (val == null) reasonToSkip = "newly created or already removed"; // Previous value is null. Entry created after dump start, skip. else { - write(cache, expireTime, key, val); + write(cache, expireTime, key, val, ver); changedCnt.increment(); } @@ -532,7 +532,7 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple else if (changed.get(cache).contains(key)) written = false; else - write(cache, expireTime, key, val); + write(cache, expireTime, key, val, ver); if (log.isTraceEnabled()) { log.trace("Iterator [" + @@ -540,17 +540,18 @@ public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask imple ", cache=" + cache + ", part=" + part + ", key=" + key + - ", written=" + written + ']'); + ", written=" + written + + ", ver=" + ver + ']'); } return written; } /** */ - private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val) { + private void write(int cache, long expireTime, KeyCacheObject key, CacheObject val, GridCacheVersion ver) { synchronized (serializer) { // Prevent concurrent access to the dump file. try { - ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, cctx.cacheObjectContext(cache)); + ByteBuffer buf = serializer.writeToBuffer(cache, expireTime, key, val, ver, cctx.cacheObjectContext(cache)); if (file.writeFully(buf) != buf.limit()) throw new IgniteException("Can't write row"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java index 27146ec3644..4a1fd7f1c29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpEntrySerializer.java @@ -24,6 +24,7 @@ import java.util.function.Function; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.dump.DumpEntry; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry; @@ -32,6 +33,8 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; /** @@ -88,11 +91,11 @@ public class DumpEntrySerializer { /** * Dump entry structure: * <pre> - * +---------+-----------+----------+-----------------+-----+-------+ - * | 4 bytes | 4 bytes | 4 bytes | 8 bytes | | | - * +---------+-----------+----------+-----------------+-----+-------+ - * | CRC | Data size | cache ID | expiration time | key | value | - * +---------+-----------+----------+-----------------+-----+-------+ + * +---------+-----------+----------+-----------------+-----+-------------+ + * | 4 bytes | 4 bytes | 4 bytes | 8 bytes | | | | + * +---------+-----------+----------+-----------------+-----+-------------+ + * | CRC | Data size | cache ID | expiration time | ver | key | value | + * +---------+-----------+----------+-----------------+-----+-------------+ * </pre> * * @param cache Cache id. @@ -108,11 +111,22 @@ public class DumpEntrySerializer { long expireTime, KeyCacheObject key, CacheObject val, + GridCacheVersion ver, CacheObjectContext coCtx ) throws IgniteCheckedException { + int verSz = Integer.BYTES/*topVer*/ + Long.BYTES/*order*/ + Integer.BYTES/*nodeOrderDrId*/; + + boolean hasConflictVer = ver.otherClusterVersion() != null; + + if (hasConflictVer) + verSz *= 2 /*GridCacheVersion otherClusterVersion*/; + + assert ver.fieldsCount() == (hasConflictVer ? 4 : 3); + int keySz = key.valueBytesLength(coCtx); int valSz = val.valueBytesLength(coCtx); - int dataSz = /*cache ID*/Integer.BYTES + /*expire time*/Long.BYTES + /*key*/keySz + /*value*/valSz; + int dataSz = /*cache ID*/Integer.BYTES + /*expire time*/Long.BYTES + + /* hasConflictVersion */1 + /*version*/verSz + /*key*/keySz + /*value*/valSz; int fullSz = dataSz + /*extra bytes for row size*/Integer.BYTES + /*CRC*/Integer.BYTES; @@ -128,6 +142,19 @@ public class DumpEntrySerializer { buf.putInt(cache); buf.putLong(expireTime); + buf.put((byte)(hasConflictVer ? 1 : 0)); + buf.putInt(ver.topologyVersion()); + buf.putLong(ver.order()); + buf.putInt(ver.nodeOrderAndDrIdRaw()); + + if (hasConflictVer) { + GridCacheVersion ver0 = (GridCacheVersion)ver.otherClusterVersion(); + + buf.putInt(ver0.topologyVersion()); + buf.putLong(ver0.order()); + buf.putInt(ver0.nodeOrderAndDrIdRaw()); + } + if (!key.putValue(buf)) throw new IgniteCheckedException("Can't write key"); @@ -195,6 +222,8 @@ public class DumpEntrySerializer { int cache = buf.getInt(); long expireTime = buf.getLong(); + GridCacheVersion ver = readVersion(buf); + int keySz = buf.getInt(); byte keyType = buf.get(); @@ -225,6 +254,10 @@ public class DumpEntrySerializer { return expireTime; } + @Override public CacheEntryVersion version() { + return ver; + } + @Override public Object key() { return raw ? key : UnwrapDataEntry.unwrapKey(key, keepBinary, fakeCacheObjCtx); } @@ -235,6 +268,24 @@ public class DumpEntrySerializer { }; } + /** @return Written entry version. */ + private static GridCacheVersion readVersion(ByteBuffer buf) { + boolean hasConflictVer = buf.get() == 1; + + int topVer = buf.getInt(); + long order = buf.getLong(); + int nodeOrderDrId = buf.getInt(); + + if (!hasConflictVer) + return new GridCacheVersion(topVer, nodeOrderDrId, order); + + int topVer0 = buf.getInt(); + long order0 = buf.getLong(); + int nodeOrderDrId0 = buf.getInt(); + + return new GridCacheVersionEx(topVer, nodeOrderDrId, order, new GridCacheVersion(topVer0, nodeOrderDrId0, order0)); + } + /** @return Thread local buffer. */ private ByteBuffer threadLocalBuffer() { return thLocBufs.computeIfAbsent(Thread.currentThread().getId(), DFLT_BUF_ALLOC); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java index 290dc0352e6..e2d22e61a0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.function.BiConsumer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.binary.BinaryObjectEx; import org.apache.ignite.internal.management.cache.PartitionKeyV2; @@ -46,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotVerificationTask.HashHolder; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier; import org.apache.ignite.internal.util.typedef.F; @@ -392,7 +392,7 @@ public class IdleVerifyUtility { public void update( KeyCacheObject key, CacheObject val, - @Nullable GridCacheVersion ver + CacheEntryVersion ver ) throws IgniteCheckedException { partHash += key.hashCode(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java index fdb5207b4a6..a61a6c8b2d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java @@ -386,6 +386,8 @@ public abstract class AbstractCacheDumpTest extends GridCommonAbstractTest { DumpEntry e = iter.next(); assertNotNull(e); + assertNotNull(e.version()); + assertNull(e.version().otherClusterVersion()); if (e.cacheId() == CU.cacheId(CACHE_0)) assertEquals(USER_FACTORY.apply((Integer)e.key()), e.value()); @@ -430,6 +432,8 @@ public abstract class AbstractCacheDumpTest extends GridCommonAbstractTest { Integer key = (Integer)e.key(); assertEquals(key, e.value()); + assertNotNull(e.version()); + assertNull(e.version().otherClusterVersion()); } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java index 429db9f2bd4..ff3b959073b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java @@ -24,38 +24,67 @@ import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.dump.DumpEntry; +import org.apache.ignite.dump.DumpReader; +import org.apache.ignite.dump.DumpReaderConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.TestDumpConsumer; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.platform.model.User; +import org.apache.ignite.plugin.AbstractCachePluginProvider; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY; +import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_THREAD_CNT; +import static org.apache.ignite.dump.DumpReaderConfiguration.DFLT_TIMEOUT; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX; @@ -305,13 +334,41 @@ public class IgniteCacheDumpSelf2Test extends GridCommonAbstractTest { ign.snapshot().createDump(DMP_NAME, null).get(); + String out = invokeCheckCommand(ign, DMP_NAME); + assertContains( null, - invokeCheckCommand(ign, DMP_NAME), + out, "Conflict partition: PartitionKeyV2 [grpId=" + CU.cacheId(DEFAULT_CACHE_NAME) + ", grpName=" + DEFAULT_CACHE_NAME + ", partId=" + corruptedPart + "]" ); + + String verPattern = "partVerHash=(-)?[0-9]+"; + String hashPattern = "partHash=(-)?[0-9]+"; + + Matcher m = Pattern.compile(verPattern).matcher(out); + + assertTrue(m.find()); + String ver0 = out.substring(m.start(), m.end()); + + assertTrue(m.find()); + String ver1 = out.substring(m.start(), m.end()); + + assertFalse(m.find()); + + m = Pattern.compile(hashPattern).matcher(out); + + assertTrue(m.find()); + String hash0 = out.substring(m.start(), m.end()); + + assertTrue(m.find()); + String hash1 = out.substring(m.start(), m.end()); + + assertFalse(m.find()); + + assertFalse(Objects.equals(ver0, ver1)); + assertFalse(Objects.equals(hash0, hash1)); } /** */ @@ -465,4 +522,115 @@ public class IgniteCacheDumpSelf2Test extends GridCommonAbstractTest { ) ); } + + /** */ + @Test + public void testDumpEntryConflictVersion() throws Exception { + IgniteConfiguration cfg = getConfiguration("test").setPluginProviders(new AbstractTestPluginProvider() { + @Override public String name() { + return "ConflictResolverProvider"; + } + + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + if (!ctx.igniteCacheConfiguration().getName().equals(DEFAULT_CACHE_NAME)) + return null; + + return new AbstractCachePluginProvider() { + @Override public @Nullable Object createComponent(Class cls) { + if (cls != CacheConflictResolutionManager.class) + return null; + + return new TestCacheConflictResolutionManager(); + } + }; + } + }); + + IgniteEx ign = startGrid(cfg); + + IgniteCache<Integer, Integer> cache = ign.createCache(new CacheConfiguration<Integer, Integer>() + .setName(DEFAULT_CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction().setPartitions(3)) + ); + + int topVer = 42; + int dataCenterId = 31; + int nodeOrder = 13; + + Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>(); + + IgniteInternalCache<Integer, Integer> intCache = ign.cachex(cache.getName()); + + for (int i = 0; i < KEYS_CNT; i++) { + KeyCacheObject key = new KeyCacheObjectImpl(i, null, intCache.affinity().partition(i)); + CacheObject val = new CacheObjectImpl(i, null); + + val.prepareMarshal(intCache.context().cacheObjectContext()); + + drMap.put(key, new GridCacheDrInfo(val, new GridCacheVersion(topVer, i, nodeOrder, dataCenterId))); + } + + intCache.putAllConflict(drMap); + + ign.snapshot().createDump(DMP_NAME, null).get(getTestTimeout()); + + TestDumpConsumer cnsmr = new TestDumpConsumer() { + @Override public void onPartition(int grp, int part, Iterator<DumpEntry> data) { + data.forEachRemaining(e -> { + int key = (int)e.key(); + int val = (int)e.value(); + + assertNotNull(e.version()); + + CacheEntryVersion conflictVer = e.version().otherClusterVersion(); + + assertNotNull(conflictVer); + assertEquals(topVer, conflictVer.topologyVersion()); + assertEquals(nodeOrder, conflictVer.nodeOrder()); + assertEquals(dataCenterId, conflictVer.clusterId()); + assertEquals(key, val); + assertEquals(key, conflictVer.order()); + }); + } + }; + + new DumpReader( + new DumpReaderConfiguration( + dumpDirectory(ign, DMP_NAME), + cnsmr, + DFLT_THREAD_CNT, + DFLT_TIMEOUT, + true, + false, + null, + false + ), + log + ).run(); + + cnsmr.check(); + } + + /** */ + public class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V> + implements CacheConflictResolutionManager<K, V> { + + /** {@inheritDoc} */ + @Override public CacheVersionConflictResolver conflictResolver() { + return new CacheVersionConflictResolver() { + @Override public <K, V> GridCacheVersionConflictContext<K, V> resolve( + CacheObjectValueContext ctx, + GridCacheVersionedEntryEx<K, V> oldEntry, + GridCacheVersionedEntryEx<K, V> newEntry, + boolean atomicVerComparator + ) { + GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); + + res.useNew(); + + return res; + } + }; + } + } }