Repository: ignite Updated Branches: refs/heads/master d829b67e9 -> a1cb021c0
IGNITE-9830 Fixed race in binary metadata registration leading to exception on commit - Fixes #4996. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1cb021c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1cb021c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1cb021c Branch: refs/heads/master Commit: a1cb021c06ffce5da460a8cd4ffe71de2c350b54 Parents: d829b67 Author: Aleksei Scherbakov <[email protected]> Authored: Wed Oct 17 13:28:20 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Oct 17 14:52:12 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 5 + .../internal/binary/BinaryReaderExImpl.java | 1 - .../internal/binary/BinarySchemaRegistry.java | 114 +++-- .../ignite/internal/binary/BinaryUtils.java | 28 +- .../cache/binary/BinaryMetadataTransport.java | 93 +++- .../binary/CacheObjectBinaryProcessorImpl.java | 219 ++++++++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 5 +- .../CachePageWriteLockUnlockTest.java | 2 + .../transactions/TxRollbackOnTimeoutTest.java | 7 +- ...MetadataConcurrentUpdateWithIndexesTest.java | 439 +++++++++++++++++++ .../IgniteBinaryCacheQueryTestSuite.java | 3 + 11 files changed, 827 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 6d48adf..02ebb25 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1008,6 +1008,11 @@ public final class IgniteSystemProperties { public static final String IGNITE_REUSE_MEMORY_ON_DEACTIVATE = "IGNITE_REUSE_MEMORY_ON_DEACTIVATE"; /** + * Timeout for waiting schema update if schema was not found for last accepted version. + */ + public static final String IGNITE_WAIT_SCHEMA_UPDATE = "IGNITE_WAIT_SCHEMA_UPDATE"; + + /** * System property to override {@link CacheConfiguration#rebalanceThrottle} configuration property for all caches. * {@code 0} by default, which means that override is disabled. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java index 38934f0..601141c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java @@ -2028,7 +2028,6 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina for (BinarySchema existingSchema : existingSchemas) existingSchemaIds.add(existingSchema.schemaId()); - throw new BinaryObjectException("Cannot find schema for object with compact footer" + " [typeName=" + type.typeName() + ", typeId=" + typeId + http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java index 91f29b2..f22fc4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySchemaRegistry.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.binary; +import java.util.ArrayList; +import java.util.List; import org.jetbrains.annotations.Nullable; import java.util.HashMap; @@ -98,75 +100,95 @@ public class BinarySchemaRegistry { * @param schemaId Schema ID. * @param schema Schema. */ - public void addSchema(int schemaId, BinarySchema schema) { - synchronized (this) { - if (inline) { - // Check if this is already known schema. - if (schemaId == schemaId1 || schemaId == schemaId2 || schemaId == schemaId3 || schemaId == schemaId4) - return; + public synchronized void addSchema(int schemaId, BinarySchema schema) { + if (inline) { + // Check if this is already known schema. + if (schemaId == schemaId1 || schemaId == schemaId2 || schemaId == schemaId3 || schemaId == schemaId4) + return; - // Try positioning new schema in inline mode. - if (schemaId1 == EMPTY) { - schemaId1 = schemaId; + // Try positioning new schema in inline mode. + if (schemaId1 == EMPTY) { + schemaId1 = schemaId; - schema1 = schema; + schema1 = schema; - inline = true; // Forcing HB edge just in case. + inline = true; // Forcing HB edge just in case. - return; - } + return; + } - if (schemaId2 == EMPTY) { - schemaId2 = schemaId; + if (schemaId2 == EMPTY) { + schemaId2 = schemaId; - schema2 = schema; + schema2 = schema; - inline = true; // Forcing HB edge just in case. + inline = true; // Forcing HB edge just in case. - return; - } + return; + } - if (schemaId3 == EMPTY) { - schemaId3 = schemaId; + if (schemaId3 == EMPTY) { + schemaId3 = schemaId; - schema3 = schema; + schema3 = schema; - inline = true; // Forcing HB edge just in case. + inline = true; // Forcing HB edge just in case. - return; - } + return; + } - if (schemaId4 == EMPTY) { - schemaId4 = schemaId; + if (schemaId4 == EMPTY) { + schemaId4 = schemaId; - schema4 = schema; + schema4 = schema; - inline = true; // Forcing HB edge just in case. + inline = true; // Forcing HB edge just in case. - return; - } + return; + } - // No luck, switching to hash map mode. - HashMap<Integer, BinarySchema> newSchemas = new HashMap<>(); + // No luck, switching to hash map mode. + HashMap<Integer, BinarySchema> newSchemas = new HashMap<>(); - newSchemas.put(schemaId1, schema1); - newSchemas.put(schemaId2, schema2); - newSchemas.put(schemaId3, schema3); - newSchemas.put(schemaId4, schema4); + newSchemas.put(schemaId1, schema1); + newSchemas.put(schemaId2, schema2); + newSchemas.put(schemaId3, schema3); + newSchemas.put(schemaId4, schema4); - newSchemas.put(schemaId, schema); + newSchemas.put(schemaId, schema); - schemas = newSchemas; + schemas = newSchemas; - inline = false; - } - else { - HashMap<Integer, BinarySchema> newSchemas = new HashMap<>(schemas); + inline = false; + } + else { + HashMap<Integer, BinarySchema> newSchemas = new HashMap<>(schemas); - newSchemas.put(schemaId, schema); + newSchemas.put(schemaId, schema); - schemas = newSchemas; - } + schemas = newSchemas; } } + + /** + * @return List of known schemas. + */ + public synchronized List<BinarySchema> schemas() { + List<BinarySchema> res = new ArrayList<>(); + + if (inline) { + if (schemaId1 != EMPTY) + res.add(schema1); + if (schemaId2 != EMPTY) + res.add(schema2); + if (schemaId3 != EMPTY) + res.add(schema3); + if (schemaId4 != EMPTY) + res.add(schema4); + } + else + res.addAll(schemas.values()); + + return res; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java index 553d8e5..77dce56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java @@ -958,10 +958,30 @@ public class BinaryUtils { * @throws BinaryObjectException If merge failed due to metadata conflict. */ public static BinaryMetadata mergeMetadata(@Nullable BinaryMetadata oldMeta, BinaryMetadata newMeta) { + return mergeMetadata(oldMeta, newMeta, null); + } + + /** + * Merge old and new metas. + * + * @param oldMeta Old meta. + * @param newMeta New meta. + * @param changedSchemas Set for holding changed schemas. + * @return New meta if old meta was null, old meta if no changes detected, merged meta otherwise. + * @throws BinaryObjectException If merge failed due to metadata conflict. + */ + public static BinaryMetadata mergeMetadata(@Nullable BinaryMetadata oldMeta, BinaryMetadata newMeta, + @Nullable Set<Integer> changedSchemas) { assert newMeta != null; - if (oldMeta == null) + if (oldMeta == null) { + if (changedSchemas != null) { + for (BinarySchema schema : newMeta.schemas()) + changedSchemas.add(schema.schemaId()); + } + return newMeta; + } else { assert oldMeta.typeId() == newMeta.typeId(); @@ -1036,8 +1056,12 @@ public class BinaryUtils { Collection<BinarySchema> mergedSchemas = new HashSet<>(oldMeta.schemas()); for (BinarySchema newSchema : newMeta.schemas()) { - if (mergedSchemas.add(newSchema)) + if (mergedSchemas.add(newSchema)) { changed = true; + + if (changedSchemas != null) + changedSchemas.add(newSchema.schemaId()); + } } // Return either old meta if no changes detected, or new merged meta. http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index 38450df..1c2f6f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -16,8 +16,12 @@ */ package org.apache.ignite.internal.processors.cache.binary; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -42,6 +46,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; @@ -87,6 +92,9 @@ final class BinaryMetadataTransport { private final ConcurrentMap<Integer, ClientMetadataRequestFuture> clientReqSyncMap = new ConcurrentHashMap<>(); /** */ + private final ConcurrentMap<SyncKey, GridFutureAdapter<?>> schemaWaitFuts = new ConcurrentHashMap<>(); + + /** */ private volatile boolean stopping; /** */ @@ -207,6 +215,21 @@ final class BinaryMetadataTransport { } /** + * Await specific schema update. + * @param typeId Type id. + * @param schemaId Schema id. + * @return Future which will be completed when schema is received. + */ + GridFutureAdapter<?> awaitSchemaUpdate(int typeId, int schemaId) { + GridFutureAdapter<Object> fut = new GridFutureAdapter<>(); + + // Use version for schemaId. + GridFutureAdapter<?> oldFut = schemaWaitFuts.putIfAbsent(new SyncKey(typeId, schemaId), fut); + + return oldFut == null ? fut : oldFut; + } + + /** * Allows client node to request latest version of binary metadata for a given typeId from the cluster * in case client is able to detect that it has obsolete metadata in its local cache. * @@ -259,6 +282,13 @@ final class BinaryMetadataTransport { /** {@inheritDoc} */ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateProposedMessage msg) { + if (log.isDebugEnabled()) + log.debug("Received MetadataUpdateProposedListener [typeId=" + msg.typeId() + + ", typeName=" + msg.metadata().typeName() + + ", pendingVer=" + msg.pendingVersion() + + ", acceptedVer=" + msg.acceptedVersion() + + ", schemasCnt=" + msg.metadata().schemas().size() + ']'); + int typeId = msg.typeId(); BinaryMetadataHolder holder = metaLocCache.get(typeId); @@ -277,20 +307,23 @@ final class BinaryMetadataTransport { acceptedVer = 0; } - if (log.isDebugEnabled()) - log.debug("Versions are stamped on coordinator" + - " [typeId=" + typeId + - ", pendingVer=" + pendingVer + - ", acceptedVer=" + acceptedVer + "]" - ); - msg.pendingVersion(pendingVer); msg.acceptedVersion(acceptedVer); BinaryMetadata locMeta = holder != null ? holder.metadata() : null; try { - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata()); + Set<Integer> changedSchemas = new LinkedHashSet<>(); + + BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas); + + if (log.isDebugEnabled()) + log.debug("Versions are stamped on coordinator" + + " [typeId=" + typeId + + ", changedSchemas=" + changedSchemas + + ", pendingVer=" + pendingVer + + ", acceptedVer=" + acceptedVer + "]" + ); msg.metadata(mergedMeta); } @@ -358,8 +391,10 @@ final class BinaryMetadataTransport { if (!msg.rejected()) { BinaryMetadata locMeta = holder != null ? holder.metadata() : null; + Set<Integer> changedSchemas = new LinkedHashSet<>(); + try { - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata()); + BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas); BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer); @@ -382,7 +417,8 @@ final class BinaryMetadataTransport { } else { if (log.isDebugEnabled()) - log.debug("Updated metadata on server node: " + newHolder); + log.debug("Updated metadata on server node [holder=" + newHolder + + ", changedSchemas=" + changedSchemas + ']'); metaLocCache.put(typeId, newHolder); } @@ -463,7 +499,7 @@ final class BinaryMetadataTransport { if (oldAcceptedVer >= newAcceptedVer) { if (log.isDebugEnabled()) log.debug("Marking ack as duplicate [holder=" + holder + - ", newAcceptedVer: " + newAcceptedVer + ']'); + ", newAcceptedVer=" + newAcceptedVer + ']'); //this is duplicate ack msg.duplicated(true); @@ -481,8 +517,26 @@ final class BinaryMetadataTransport { GridFutureAdapter<MetadataUpdateResult> fut = syncMap.get(new SyncKey(typeId, newAcceptedVer)); + holder = metaLocCache.get(typeId); + if (log.isDebugEnabled()) - log.debug("Completing future " + fut + " for " + metaLocCache.get(typeId)); + log.debug("Completing future " + fut + " for " + holder); + + if (!schemaWaitFuts.isEmpty()) { + Iterator<Map.Entry<SyncKey, GridFutureAdapter<?>>> iter = schemaWaitFuts.entrySet().iterator(); + + while (iter.hasNext()) { + Map.Entry<SyncKey, GridFutureAdapter<?>> entry = iter.next(); + + SyncKey key = entry.getKey(); + + if (key.typeId() == typeId && holder.metadata().hasSchema(key.version())) { + entry.getValue().onDone(); + + iter.remove(); + } + } + } if (fut != null) fut.onDone(MetadataUpdateResult.createSuccessfulResult()); @@ -527,6 +581,11 @@ final class BinaryMetadataTransport { void key(SyncKey key) { this.key = key; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MetadataUpdateResultFuture.class, this); + } } /** @@ -580,6 +639,11 @@ final class BinaryMetadataTransport { return (typeId == that.typeId) && (ver == that.ver); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SyncKey.class, this); + } } /** @@ -615,7 +679,7 @@ final class BinaryMetadataTransport { binMetaBytes = U.marshal(ctx, metaHolder); } catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal binary metadata for [typeId: " + typeId + "]", e); + U.error(log, "Failed to marshal binary metadata for [typeId=" + typeId + ']', e); resp.markErrorOnRequest(); } @@ -670,7 +734,8 @@ final class BinaryMetadataTransport { do { oldHolder = metaLocCache.get(typeId); - if (oldHolder != null && obsoleteUpdate( + // typeId metadata cannot be removed after initialization. + if (obsoleteUpdate( oldHolder.pendingVersion(), oldHolder.acceptedVersion(), newHolder.pendingVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 4c101b2..137db9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -19,10 +19,14 @@ package org.apache.ignite.internal.processors.cache.binary; import java.io.File; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -30,6 +34,8 @@ import javax.cache.CacheException; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryField; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; @@ -39,8 +45,10 @@ import org.apache.ignite.binary.BinaryTypeConfiguration; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.UnregisteredBinaryTypeException; import org.apache.ignite.internal.binary.BinaryContext; @@ -65,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.IgniteUtils; @@ -76,7 +85,9 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T1; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; @@ -88,7 +99,11 @@ import org.apache.ignite.spi.discovery.IgniteDiscoveryThread; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TEST_FEATURES_ENABLED; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_SCHEMA_UPDATE; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC; @@ -120,6 +135,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm */ @Nullable private File binaryMetadataFileStoreDir; + /** How long to wait for schema if no updates in progress. */ + private long waitSchemaTimeout = IgniteSystemProperties.getLong(IGNITE_WAIT_SCHEMA_UPDATE, 30_000); + + /** For tests. */ + public static boolean useTestBinaryCtx = false; + /** */ @GridToStringExclude private IgniteBinary binaries; @@ -205,7 +226,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm BinaryMarshaller bMarsh0 = (BinaryMarshaller)marsh; - binaryCtx = new BinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class)); + binaryCtx = useTestBinaryCtx ? + new TestBinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class)) : + new BinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class)); IgniteUtils.invoke(BinaryMarshaller.class, bMarsh0, "setBinaryContext", binaryCtx, ctx.config()); @@ -452,11 +475,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null; - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0); + Set<Integer> changedSchemas = new LinkedHashSet<>(); + + BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0, changedSchemas); - //metadata requested to be added is exactly the same as already presented in the cache - if (mergedMeta == oldMeta) - return; + if (oldMeta != null && mergedMeta == oldMeta && metaHolder.pendingVersion() == metaHolder.acceptedVersion()) + return; // Safe to use existing schemas. if (failIfUnregistered) throw new UnregisteredBinaryTypeException( @@ -466,7 +490,24 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm "dev-list.", typeId, mergedMeta); - MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get(); + long t0 = System.nanoTime(); + + GridFutureAdapter<MetadataUpdateResult> fut = transport.requestMetadataUpdate(mergedMeta); + + MetadataUpdateResult res = fut.get(); + + if (log.isDebugEnabled()) { + IgniteInternalTx tx = ctx.cache().context().tm().tx(); + + log.debug("Completed metadata update [typeId=" + typeId + + ", typeName=" + newMeta.typeName() + + ", changedSchemas=" + changedSchemas + + ", waitTime=" + MILLISECONDS.convert(System.nanoTime() - t0, NANOSECONDS) + "ms" + + ", holder=" + metaHolder + + ", fut=" + fut + + ", tx=" + CU.txString(tx) + + ']'); + } assert res != null; @@ -541,9 +582,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm if (log.isDebugEnabled() && !fut.isDone()) log.debug("Waiting for update for" + - " [typeId=" + typeId + - ", pendingVer=" + holder.pendingVersion() + - ", acceptedVer=" + holder.acceptedVersion() + "]"); + " [typeId=" + typeId + + ", pendingVer=" + holder.pendingVersion() + + ", acceptedVer=" + holder.acceptedVersion() + "]"); try { fut.get(); @@ -565,40 +606,99 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm if (ctx.clientNode()) { if (holder == null || !holder.metadata().hasSchema(schemaId)) { + if (log.isDebugEnabled()) + log.debug("Waiting for client metadata update" + + " [typeId=" + typeId + + ", schemaId=" + schemaId + + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion()) + + ", acceptedVer=" + (holder == null ? "NA" :holder.acceptedVersion()) + ']'); + try { transport.requestUpToDateMetadata(typeId).get(); - - holder = metadataLocCache.get(typeId); } catch (IgniteCheckedException ignored) { // No-op. } + + holder = metadataLocCache.get(typeId); + + if (log.isDebugEnabled()) + log.debug("Finished waiting for client metadata update" + + " [typeId=" + typeId + + ", schemaId=" + schemaId + + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion()) + + ", acceptedVer=" + (holder == null ? "NA" :holder.acceptedVersion()) + ']'); } } - else if (holder != null) { - if (IgniteThread.current() instanceof IgniteDiscoveryThread) + else { + if (holder != null && IgniteThread.current() instanceof IgniteDiscoveryThread) return holder.metadata().wrap(binaryCtx); + else if (holder != null && (holder.pendingVersion() - holder.acceptedVersion() > 0)) { + if (log.isDebugEnabled()) + log.debug("Waiting for metadata update" + + " [typeId=" + typeId + + ", schemaId=" + schemaId + + ", pendingVer=" + holder.pendingVersion() + + ", acceptedVer=" + holder.acceptedVersion() + ']'); - if (holder.pendingVersion() - holder.acceptedVersion() > 0) { - GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate( - typeId, - holder.pendingVersion()); + long t0 = System.nanoTime(); - if (log.isDebugEnabled() && !fut.isDone()) - log.debug("Waiting for update for" + - " [typeId=" + typeId - + ", schemaId=" + schemaId - + ", pendingVer=" + holder.pendingVersion() - + ", acceptedVer=" + holder.acceptedVersion() + "]"); + GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate( + typeId, + holder.pendingVersion()); try { fut.get(); } + catch (IgniteCheckedException e) { + log.error("Failed to wait for metadata update [typeId=" + typeId + ", schemaId=" + schemaId + ']', e); + } + + if (log.isDebugEnabled()) + log.debug("Finished waiting for metadata update" + + " [typeId=" + typeId + + ", waitTime=" + NANOSECONDS.convert(System.nanoTime() - t0, MILLISECONDS) + "ms" + + ", schemaId=" + schemaId + + ", pendingVer=" + holder.pendingVersion() + + ", acceptedVer=" + holder.acceptedVersion() + ']'); + + holder = metadataLocCache.get(typeId); + } + else if (holder == null || !holder.metadata().hasSchema(schemaId)) { + // Last resort waiting. + U.warn(log, + "Schema is missing while no metadata updates are in progress " + + "(will wait for schema update within timeout defined by IGNITE_BINARY_META_UPDATE_TIMEOUT system property)" + + " [typeId=" + typeId + + ", missingSchemaId=" + schemaId + + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion()) + + ", acceptedVer=" + (holder == null ? "NA" : holder.acceptedVersion()) + + ", binMetaUpdateTimeout=" + waitSchemaTimeout +']'); + + long t0 = System.nanoTime(); + + GridFutureAdapter<?> fut = transport.awaitSchemaUpdate(typeId, schemaId); + + try { + fut.get(waitSchemaTimeout); + } + catch (IgniteFutureTimeoutCheckedException e) { + log.error("Timed out while waiting for schema update [typeId=" + typeId + ", schemaId=" + + schemaId + ']'); + } catch (IgniteCheckedException ignored) { // No-op. } holder = metadataLocCache.get(typeId); + + if (log.isDebugEnabled() && holder != null && holder.metadata().hasSchema(schemaId)) + log.debug("Found the schema after wait" + + " [typeId=" + typeId + + ", waitTime=" + NANOSECONDS.convert(System.nanoTime() - t0, MILLISECONDS) + "ms" + + ", schemaId=" + schemaId + + ", pendingVer=" + holder.pendingVersion() + + ", acceptedVer=" + holder.acceptedVersion() + ']'); } } @@ -903,7 +1003,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm if ((res = validateBinaryConfiguration(rmtNode)) != null) return res; - return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataHolder>) discoData.joiningNodeData()); + return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataHolder>)discoData.joiningNodeData()); } /** */ @@ -1070,4 +1170,75 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm public void setBinaryMetadataFileStoreDir(@Nullable File binaryMetadataFileStoreDir) { this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir; } + + /** */ + public static class TestBinaryContext extends BinaryContext { + /** */ + private List<TestBinaryContextListener> listeners; + + /** + * @param metaHnd Meta handler. + * @param igniteCfg Ignite config. + * @param log Logger. + */ + public TestBinaryContext(BinaryMetadataHandler metaHnd, IgniteConfiguration igniteCfg, + IgniteLogger log) { + super(metaHnd, igniteCfg, log); + } + + /** {@inheritDoc} */ + @Nullable @Override public BinaryType metadata(int typeId) throws BinaryObjectException { + BinaryType metadata = super.metadata(typeId); + + if (listeners != null) { + for (TestBinaryContextListener listener : listeners) + listener.onAfterMetadataRequest(typeId, metadata); + } + + return metadata; + } + + /** {@inheritDoc} */ + @Override public void updateMetadata(int typeId, BinaryMetadata meta, + boolean failIfUnregistered) throws BinaryObjectException { + if (listeners != null) { + for (TestBinaryContextListener listener : listeners) + listener.onBeforeMetadataUpdate(typeId, meta); + } + + super.updateMetadata(typeId, meta, failIfUnregistered); + } + + /** */ + public interface TestBinaryContextListener { + /** + * @param typeId Type id. + * @param type Type. + */ + void onAfterMetadataRequest(int typeId, BinaryType type); + + /** + * @param typeId Type id. + * @param metadata Metadata. + */ + void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata); + } + + /** + * @param lsnr Listener. + */ + public void addListener(TestBinaryContextListener lsnr) { + if (listeners == null) + listeners = new ArrayList<>(); + + if (!listeners.contains(lsnr)) + listeners.add(lsnr); + } + + /** */ + public void clearAllListener() { + if (listeners != null) + listeners.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 048abf6..55462ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -423,6 +423,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery /** */ private IgniteDiscoverySpiInternalListener internalLsnr; + /** For test purposes. */ + private boolean skipAddrsRandomization = false; + /** * Gets current SPI state. * @@ -1881,7 +1884,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery } } - if (!res.isEmpty()) + if (!res.isEmpty() && !skipAddrsRandomization) Collections.shuffle(res); return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java index 84fd916..41c5882 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePageWriteLockUnlockTest.java @@ -100,6 +100,8 @@ public class CachePageWriteLockUnlockTest extends GridCommonAbstractTest { grid0 = startGrid(0); + grid0.cluster().active(true); + preloadPartition(grid0, DEFAULT_CACHE_NAME, PARTITION); Iterator<Cache.Entry<Object, Object>> it = grid0.cache(DEFAULT_CACHE_NAME).iterator(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java index ccf4c8a..ae75caa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -819,7 +819,12 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { tx.commit(); } catch (Throwable t) { - assertTrue(X.hasCause(t, TransactionTimeoutException.class)); + boolean timedOut = X.hasCause(t, TransactionTimeoutException.class); + + if (!timedOut) + log.error("Got unexpected exception", t); + + assertTrue(timedOut); } assertEquals(0, client.cache(CACHE_NAME).size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java new file mode 100644 index 0000000..fed1d7f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataConcurrentUpdateWithIndexesTest.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.QueryIndexType; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.binary.BinaryMetadata; +import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiClosure; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Tests scenario for too early metadata update completion in case of multiple concurrent updates for the same schema. + * <p> + * Scenario is the following: + * + * <ul> + * <li>Start 4 nodes, connect client to node 2 in topology order (starting from 1).</li> + * <li>Start two concurrent transactions from client node producing same schema update.</li> + * <li>Delay second update until first update will return to client with stamped propose message and writes new + * schema to local metadata cache</li> + * <li>Unblock second update. It should correctly wait until the metadata is applied on all + * nodes or tx will fail on commit.</li> + * </ul> + */ +public class BinaryMetadataConcurrentUpdateWithIndexesTest extends GridCommonAbstractTest { + /** */ + private static final int FIELDS = 2; + + /** */ + private static final int MB = 1024 * 1024; + + /** */ + private static final TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + cfg.setIncludeEventTypes(EventType.EVTS_DISCOVERY); + + BlockTcpDiscoverySpi spi = new BlockTcpDiscoverySpi(); + + Field rndAddrsField = U.findField(BlockTcpDiscoverySpi.class, "skipAddrsRandomization"); + + assertNotNull(rndAddrsField); + + rndAddrsField.set(spi, true); + + cfg.setDiscoverySpi(spi.setIpFinder(ipFinder)); + + cfg.setClientMode(igniteInstanceName.startsWith("client")); + + QueryEntity qryEntity = new QueryEntity("java.lang.Integer", "Value"); + + LinkedHashMap<String, String> fields = new LinkedHashMap<>(); + + Collection<QueryIndex> indexes = new ArrayList<>(FIELDS); + + for (int i = 0; i < FIELDS; i++) { + String name = "s" + i; + + fields.put(name, "java.lang.String"); + + indexes.add(new QueryIndex(name, QueryIndexType.SORTED)); + } + + qryEntity.setFields(fields); + + qryEntity.setIndexes(indexes); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration(). + setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(50 * MB))); + + cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME). + setBackups(0). + setQueryEntities(Collections.singleton(qryEntity)). + setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL). + setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC). + setCacheMode(CacheMode.PARTITIONED)); + + return cfg; + } + + /** Flag to start syncing metadata requests. Should skip on exchange. */ + private volatile boolean syncMeta; + + /** Metadata init latch. Both threads must request initial metadata. */ + private CountDownLatch initMetaReq = new CountDownLatch(2); + + /** Thread local flag for need of waiting local metadata update. */ + private ThreadLocal<Boolean> delayMetadataUpdateThreadLoc = new ThreadLocal<>(); + + /** Latch for waiting local metadata update. */ + public static final CountDownLatch localMetaUpdatedLatch = new CountDownLatch(1); + + /** */ + public void testMissingSchemaUpdate() throws Exception { + // Start order is important. + Ignite node0 = startGrid("node0"); + + Ignite node1 = startGrid("node1"); + + IgniteEx client0 = startGrid("client0"); + + CacheObjectBinaryProcessorImpl.TestBinaryContext clientCtx = + (CacheObjectBinaryProcessorImpl.TestBinaryContext)((CacheObjectBinaryProcessorImpl)client0.context(). + cacheObjects()).binaryContext(); + + clientCtx.addListener(new CacheObjectBinaryProcessorImpl.TestBinaryContext.TestBinaryContextListener() { + @Override public void onAfterMetadataRequest(int typeId, BinaryType type) { + if (syncMeta) { + try { + initMetaReq.countDown(); + + initMetaReq.await(); + } + catch (Exception e) { + throw new BinaryObjectException(e); + } + } + } + + @Override public void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata) { + // Delay one of updates until schema is locally updated on propose message. + if (delayMetadataUpdateThreadLoc.get() != null) + await(localMetaUpdatedLatch, 5000); + } + }); + + Ignite node2 = startGrid("node2"); + + Ignite node3 = startGrid("node3"); + + startGrid("node4"); + + node0.cluster().active(true); + + awaitPartitionMapExchange(); + + syncMeta = true; + + CountDownLatch clientProposeMsgBlockedLatch = new CountDownLatch(1); + + AtomicBoolean clientWait = new AtomicBoolean(); + final Object clientMux = new Object(); + + AtomicBoolean srvWait = new AtomicBoolean(); + final Object srvMux = new Object(); + + ((BlockTcpDiscoverySpi)node1.configuration().getDiscoverySpi()).setClosure((snd, msg) -> { + if (msg instanceof MetadataUpdateProposedMessage) { + if (Thread.currentThread().getName().contains("client")) { + log.info("Block custom message to client0: [locNode=" + snd + ", msg=" + msg + ']'); + + clientProposeMsgBlockedLatch.countDown(); + + // Message to client + synchronized (clientMux) { + while (!clientWait.get()) + try { + clientMux.wait(); + } + catch (InterruptedException e) { + fail(); + } + } + } + } + + return null; + }); + + ((BlockTcpDiscoverySpi)node2.configuration().getDiscoverySpi()).setClosure((snd, msg) -> { + if (msg instanceof MetadataUpdateProposedMessage) { + MetadataUpdateProposedMessage msg0 = (MetadataUpdateProposedMessage)msg; + + int pendingVer = U.field(msg0, "pendingVer"); + + // Should not block propose messages until they reach coordinator. + if (pendingVer == 0) + return null; + + log.info("Block custom message to next server: [locNode=" + snd + ", msg=" + msg + ']'); + + // Message to client + synchronized (srvMux) { + while (!srvWait.get()) + try { + srvMux.wait(); + } + catch (InterruptedException e) { + fail(); + } + } + } + + return null; + }); + + Integer key = primaryKey(node3.cache(DEFAULT_CACHE_NAME)); + + IgniteInternalFuture fut0 = runAsync(() -> { + try (Transaction tx = client0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + client0.cache(DEFAULT_CACHE_NAME).put(key, build(client0, "val", 0)); + + tx.commit(); + } + catch (Throwable t) { + log.error("err", t); + } + + }); + + // Implements test logic. + IgniteInternalFuture fut1 = runAsync(() -> { + // Wait for initial metadata received. It should be initial version: pending=0, accepted=0 + await(initMetaReq, 5000); + + // Wait for blocking proposal message to client node. + await(clientProposeMsgBlockedLatch, 5000); + + // Unblock proposal message to client. + clientWait.set(true); + + synchronized (clientMux) { + clientMux.notify(); + } + + // Give some time to apply update. + doSleep(3000); + + // Unblock second metadata update. + localMetaUpdatedLatch.countDown(); + + // Give some time for tx to complete (success or fail). fut2 will throw an error if tx has failed on commit. + doSleep(3000); + + // Unblock metadata message and allow for correct version acceptance. + srvWait.set(true); + + synchronized (srvMux) { + srvMux.notify(); + } + }); + + IgniteInternalFuture fut2 = runAsync(() -> { + delayMetadataUpdateThreadLoc.set(true); + + try (Transaction tx = client0.transactions(). + txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) { + client0.cache(DEFAULT_CACHE_NAME).put(key, build(client0, "val", 0)); + + tx.commit(); + } + }); + + fut0.get(); + fut1.get(); + fut2.get(); + } + + /** + * @param latch Latch. + * @param timeout Timeout. + */ + private void await(CountDownLatch latch, long timeout) { + try { + latch.await(5000, MILLISECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + long cnt = initMetaReq.getCount(); + + if (cnt != 0) + throw new RuntimeException("Invalid latch count after wait: " + cnt); + } + + /** + * @param ignite Ignite. + * @param prefix Value prefix. + * @param fields Fields. + */ + protected BinaryObject build(Ignite ignite, String prefix, int... fields) { + BinaryObjectBuilder builder = ignite.binary().builder("Value"); + + for (int field : fields) { + assertTrue(field < FIELDS); + + builder.setField("i" + field, field); + builder.setField("s" + field, prefix + field); + } + + return builder.build(); + } + + /** + * Discovery SPI which can simulate network split. + */ + protected class BlockTcpDiscoverySpi extends TcpDiscoverySpi { + /** Closure. */ + private volatile IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo; + + /** + * @param clo Closure. + */ + public void setClosure(IgniteBiClosure<ClusterNode, DiscoveryCustomMessage, Void> clo) { + this.clo = clo; + } + + /** + * @param addr Address. + * @param msg Message. + */ + private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage msg) { + if (!(msg instanceof TcpDiscoveryCustomEventMessage)) + return; + + TcpDiscoveryCustomEventMessage cm = (TcpDiscoveryCustomEventMessage)msg; + + DiscoveryCustomMessage delegate; + + try { + DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), U.resolveClassLoader(ignite().configuration())); + + assertNotNull(custMsg); + + delegate = ((CustomMessageWrapper)custMsg).delegate(); + + } + catch (Throwable throwable) { + throw new RuntimeException(throwable); + } + + if (clo != null) + clo.apply(addr, delegate); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket( + Socket sock, + TcpDiscoveryAbstractMessage msg, + byte[] data, + long timeout + ) throws IOException { + if (spiCtx != null) + apply(spiCtx.localNode(), msg); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, + OutputStream out, + TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (spiCtx != null) + apply(spiCtx.localNode(), msg); + + super.writeToSocket(sock, out, msg, timeout); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + CacheObjectBinaryProcessorImpl.useTestBinaryCtx = true; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + CacheObjectBinaryProcessorImpl.useTestBinaryCtx = false; + + stopAllGrids(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a1cb021c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index b44ff2d..8a60c7d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.BinaryMetadataConcurrentUpdateWithIndexesTest; import org.apache.ignite.internal.processors.cache.BinarySerializationQuerySelfTest; import org.apache.ignite.internal.processors.cache.BinarySerializationQueryWithReflectiveSerializerSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheBinaryObjectsScanSelfTest; @@ -42,6 +43,8 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheBinaryObjectsScanWithEventsSelfTest.class); suite.addTestSuite(BigEntryQueryTest.class); + suite.addTestSuite(BinaryMetadataConcurrentUpdateWithIndexesTest.class); + //Should be adjusted. Not ready to be used with BinaryMarshaller. //suite.addTestSuite(GridCacheBinarySwapScanQuerySelfTest.class);
