IGNITE-4997: DDL: Fixed schema state replay on client reconnect. This closes #1845.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3eb52a8a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3eb52a8a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3eb52a8a Branch: refs/heads/ignite-5024 Commit: 3eb52a8a4d42add524051a9611b1b7d1a1d17398 Parents: 8ad5a94 Author: Alexander Paschenko <[email protected]> Authored: Fri Apr 21 11:15:55 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Apr 21 11:15:55 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 12 ++ .../processors/query/GridQueryProcessor.java | 173 ++++++++++--------- .../cache/index/AbstractSchemaSelfTest.java | 44 +++-- .../DynamicIndexAbstractConcurrentSelfTest.java | 122 +++++++++++++ .../cache/index/SchemaExchangeSelfTest.java | 57 +++++- 5 files changed, 307 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index da6ebc1..28ef22f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1185,6 +1185,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { cache.onReconnected(); reconnected.add(cache); + + if (!sysCache) { + // Re-create cache structures inside indexing in order to apply recent schema changes. + GridCacheContext cctx = cache.context(); + + DynamicCacheDescriptor desc = cacheDescriptor(name); + + assert desc != null; + + ctx.query().onCacheStop0(cctx.name()); + ctx.query().onCacheStart0(cctx, desc.schema()); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 8381882..e0dc387 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -652,113 +652,122 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Create type descriptors from schema and initialize indexing for given cache.<p> + * Use with {@link #busyLock} where appropriate. * @param cctx Cache context. * @param schema Initial schema. * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"deprecation", "ThrowableResultOfMethodCallIgnored"}) - private void initializeCache(GridCacheContext<?, ?> cctx, QuerySchema schema) throws IgniteCheckedException { - String space = cctx.name(); + public void onCacheStart0(GridCacheContext<?, ?> cctx, QuerySchema schema) + throws IgniteCheckedException { - // Prepare candidates. - List<Class<?>> mustDeserializeClss = new ArrayList<>(); + cctx.shared().database().checkpointReadLock(); - Collection<QueryTypeCandidate> cands = new ArrayList<>(); + try { + synchronized (stateMux) { + String space = cctx.name(); - Collection<QueryEntity> qryEntities = schema.entities(); + // Prepare candidates. + List<Class<?>> mustDeserializeClss = new ArrayList<>(); - if (!F.isEmpty(qryEntities)) { - for (QueryEntity qryEntity : qryEntities) { - QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, cctx, qryEntity, mustDeserializeClss); + Collection<QueryTypeCandidate> cands = new ArrayList<>(); - cands.add(cand); - } - } + Collection<QueryEntity> qryEntities = schema.entities(); - // Ensure that candidates has unique index names. Otherwise we will not be able to apply pending operations. - Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>(); - Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>(); + if (!F.isEmpty(qryEntities)) { + for (QueryEntity qryEntity : qryEntities) { + QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, cctx, qryEntity, + mustDeserializeClss); - for (QueryTypeCandidate cand : cands) { - QueryTypeDescriptorImpl desc = cand.descriptor(); + cands.add(cand); + } + } - QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc); + // Ensure that candidates has unique index names. Otherwise we will not be able to apply pending operations. + Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>(); + Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>(); - if (oldDesc != null) - throw new IgniteException("Duplicate table name [cache=" + space + - ", tblName=" + desc.tableName() + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']'); + for (QueryTypeCandidate cand : cands) { + QueryTypeDescriptorImpl desc = cand.descriptor(); - for (String idxName : desc.indexes().keySet()) { - oldDesc = idxTypMap.put(idxName, desc); + QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc); - if (oldDesc != null) - throw new IgniteException("Duplicate index name [cache=" + space + - ", idxName=" + idxName + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']'); - } - } + if (oldDesc != null) + throw new IgniteException("Duplicate table name [cache=" + space + + ", tblName=" + desc.tableName() + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']'); - // Apply pending operation which could have been completed as no-op at this point. There could be only one - // in-flight operation for a cache. - synchronized (stateMux) { - if (disconnected) - return; + for (String idxName : desc.indexes().keySet()) { + oldDesc = idxTypMap.put(idxName, desc); - for (SchemaOperation op : schemaOps.values()) { - if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId())) { - if (op.started()) { - SchemaOperationWorker worker = op.manager().worker(); + if (oldDesc != null) + throw new IgniteException("Duplicate index name [cache=" + space + + ", idxName=" + idxName + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']'); + } + } - assert !worker.cacheRegistered(); + // Apply pending operation which could have been completed as no-op at this point. + // There could be only one in-flight operation for a cache. + for (SchemaOperation op : schemaOps.values()) { + if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId())) { + if (op.started()) { + SchemaOperationWorker worker = op.manager().worker(); - if (!worker.nop()) { - IgniteInternalFuture fut = worker.future(); + assert !worker.cacheRegistered(); - assert fut.isDone(); + if (!worker.nop()) { + IgniteInternalFuture fut = worker.future(); - if (fut.error() == null) { - SchemaAbstractOperation op0 = op.proposeMessage().operation(); + assert fut.isDone(); - if (op0 instanceof SchemaIndexCreateOperation) { - SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation)op0; + if (fut.error() == null) { + SchemaAbstractOperation op0 = op.proposeMessage().operation(); - QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName()); + if (op0 instanceof SchemaIndexCreateOperation) { + SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation) op0; - assert typeDesc != null; + QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName()); - QueryUtils.processDynamicIndexChange(opCreate.indexName(), opCreate.index(), - typeDesc); - } - else if (op0 instanceof SchemaIndexDropOperation) { - SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation)op0; + assert typeDesc != null; + + QueryUtils.processDynamicIndexChange(opCreate.indexName(), opCreate.index(), + typeDesc); + } + else if (op0 instanceof SchemaIndexDropOperation) { + SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation) op0; - QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName()); + QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName()); - assert typeDesc != null; + assert typeDesc != null; - QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc); + QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc); + } + else + assert false; } - else - assert false; } } + + break; } + } - break; + // Ready to register at this point. + registerCache0(space, cctx, cands); + + // Warn about possible implicit deserialization. + if (!mustDeserializeClss.isEmpty()) { + U.warn(log, "Some classes in query configuration cannot be written in binary format " + + "because they either implement Externalizable interface or have writeObject/readObject " + + "methods. Instances of these classes will be deserialized in order to build indexes. Please " + + "ensure that all nodes have these classes in classpath. To enable binary serialization " + + "either implement " + Binarylizable.class.getSimpleName() + " interface or set explicit " + + "serializer using BinaryTypeConfiguration.setSerializer() method: " + mustDeserializeClss); } } } - - // Ready to register at this point. - registerCache0(space, cctx, cands); - - // Warn about possible implicit deserialization. - if (!mustDeserializeClss.isEmpty()) { - U.warn(log, "Some classes in query configuration cannot be written in binary format " + - "because they either implement Externalizable interface or have writeObject/readObject methods. " + - "Instances of these classes will be deserialized in order to build indexes. Please ensure that " + - "all nodes have these classes in classpath. To enable binary serialization either implement " + - Binarylizable.class.getSimpleName() + " interface or set explicit serializer using " + - "BinaryTypeConfiguration.setSerializer() method: " + mustDeserializeClss); + finally { + cctx.shared().database().checkpointReadUnlock(); } } @@ -780,7 +789,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { schemaOps.clear(); } - // Complete client futures outside of synchonized block because they may have listeners/chains. + // Complete client futures outside of synchronized block because they may have listeners/chains. for (SchemaOperationClientFuture fut : futs) fut.onDone(new SchemaOperationException("Client node is disconnected (operation result is unknown).")); @@ -804,14 +813,10 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (!busyLock.enterBusy()) return; - cctx.shared().database().checkpointReadLock(); - try { - initializeCache(cctx, schema); + onCacheStart0(cctx, schema); } finally { - cctx.shared().database().checkpointReadUnlock(); - busyLock.leaveBusy(); } } @@ -827,7 +832,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { return; try { - unregisterCache0(cctx.name()); + onCacheStop0(cctx.name()); } finally { busyLock.leaveBusy(); @@ -1302,7 +1307,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { spaces.add(CU.mask(space)); } catch (IgniteCheckedException | RuntimeException e) { - unregisterCache0(space); + onCacheStop0(space); throw e; } @@ -1310,12 +1315,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Unregister cache. + * Unregister cache.<p> + * Use with {@link #busyLock} where appropriate. * * @param space Space. */ - private void unregisterCache0(String space) { - assert idx != null; + public void onCacheStop0(String space) { + if (idx == null) + return; synchronized (stateMux) { // Clear types. @@ -1464,8 +1471,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { fut.onDone(e); - if (e instanceof Error) - throw e; + throw e; } } }; @@ -1547,6 +1553,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @return Type descriptor if found. * @throws IgniteCheckedException If type check failed. */ + @SuppressWarnings("ConstantConditions") @Nullable private QueryTypeDescriptorImpl typeByValue(CacheObjectContext coctx, KeyCacheObject key, CacheObject val, http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java index e228026..19d6f54 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; +import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; @@ -131,7 +132,7 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest { } /** - * Assert index state on all nodes. + * Assert index state on all <b>affinity</b> nodes. * * @param cacheName Cache name. * @param tblName Table name. @@ -140,25 +141,44 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest { */ protected static void assertIndex(String cacheName, String tblName, String idxName, IgniteBiTuple<String, Boolean>... fields) { + assertIndex(cacheName, false, tblName, idxName, fields); + } + + /** + * Assert index state on all nodes. + * + * @param cacheName Cache name. + * @param checkNonAffinityNodes Whether existence of {@link GridQueryIndexDescriptor} must be checked on non + * affinity nodes as well. + * @param tblName Table name. + * @param idxName Index name. + * @param fields Fields. + */ + protected static void assertIndex(String cacheName, boolean checkNonAffinityNodes, String tblName, String idxName, + IgniteBiTuple<String, Boolean>... fields) { for (Ignite node : Ignition.allGrids()) - assertIndex((IgniteEx)node, cacheName, tblName, idxName, fields); + assertIndex(node, checkNonAffinityNodes, cacheName, tblName, idxName, fields); } /** * Assert index state on particular node. * * @param node Node. + * @param checkNonAffinityNode Whether existence of {@link GridQueryIndexDescriptor} must be checked regardless of + * whether this node is affinity node or not. * @param cacheName Cache name. * @param tblName Table name. * @param idxName Index name. * @param fields Fields. */ - protected static void assertIndex(IgniteEx node, String cacheName, String tblName, String idxName, - IgniteBiTuple<String, Boolean>... fields) { - assertIndexDescriptor(node, cacheName, tblName, idxName, fields); + protected static void assertIndex(Ignite node, boolean checkNonAffinityNode, String cacheName, String tblName, + String idxName, IgniteBiTuple<String, Boolean>... fields) { + IgniteEx node0 = (IgniteEx)node; + + assertIndexDescriptor(node0, cacheName, tblName, idxName, fields); - if (affinityNode(node, cacheName)) { - QueryTypeDescriptorImpl typeDesc = typeExisting(node, cacheName, tblName); + if (checkNonAffinityNode || affinityNode(node0, cacheName)) { + QueryTypeDescriptorImpl typeDesc = typeExisting(node0, cacheName, tblName); assertIndex(typeDesc, idxName, fields); } @@ -263,11 +283,13 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest { * @param tblName Table name. * @param idxName Index name. */ - protected static void assertNoIndex(IgniteEx node, String cacheName, String tblName, String idxName) { - assertNoIndexDescriptor(node, cacheName, idxName); + protected static void assertNoIndex(Ignite node, String cacheName, String tblName, String idxName) { + IgniteEx node0 = (IgniteEx)node; + + assertNoIndexDescriptor(node0, cacheName, idxName); - if (affinityNode(node, cacheName)) { - QueryTypeDescriptorImpl typeDesc = typeExisting(node, cacheName, tblName); + if (affinityNode(node0, cacheName)) { + QueryTypeDescriptorImpl typeDesc = typeExisting(node0, cacheName, tblName); assertNoIndex(typeDesc, idxName); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java index d2a2f49..5976615 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.index; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CacheAtomicityMode; @@ -28,6 +29,7 @@ import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteClientReconnectAbstractTest; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.query.GridQueryProcessor; @@ -47,6 +49,8 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi; + /** * Concurrency tests for dynamic index create/drop. */ @@ -111,6 +115,11 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde return ccfg.setCacheMode(cacheMode).setAtomicityMode(atomicityMode); } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception { + return super.commonConfiguration(idx).setDiscoverySpi(new TestTcpDiscoverySpi()); + } + /** * Make sure that coordinator migrates correctly between nodes. * @@ -600,6 +609,119 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicInde } /** + * Make sure that client receives schema changes made while it was disconnected. + * + * @throws Exception If failed. + */ + public void testClientReconnect() throws Exception { + checkClientReconnect(false); + } + + /** + * Make sure that client receives schema changes made while it was disconnected, even with cache recreation. + * + * @throws Exception If failed. + */ + public void testClientReconnectWithCacheRestart() throws Exception { + checkClientReconnect(true); + } + + /** + * Make sure that client receives schema changes made while it was disconnected, optionally with cache restart + * in the interim. + * + * @param restartCache Whether cache needs to be recreated during client's absence. + * @throws Exception If failed. + */ + private void checkClientReconnect(final boolean restartCache) throws Exception { + // Start complex topology. + final Ignite srv = Ignition.start(serverConfiguration(1)); + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + + final Ignite cli = Ignition.start(clientConfiguration(4)); + + cli.createCache(cacheConfiguration()); + + // Check index create. + reconnectClientNode(srv, cli, restartCache, new RunnableX() { + @Override public void run() throws Exception { + final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get(); + } + }); + + assertIndex(cli, true, CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + + // Check index drop. + reconnectClientNode(srv, cli, restartCache, new RunnableX() { + @Override public void run() throws Exception { + if (!restartCache) + queryProcessor(srv).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, false).get(); + } + }); + + assertNoIndex(cli, CACHE_NAME, TBL_NAME, IDX_NAME_1); + assertIndexNotUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + + // Update existing index. + QueryIndex idx = index(IDX_NAME_2, field(alias(FIELD_NAME_2))); + + queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get(); + + assertIndex(cli, true, CACHE_NAME, TBL_NAME, IDX_NAME_2, field(alias(FIELD_NAME_2))); + assertIndexUsed(IDX_NAME_2, SQL_SIMPLE_FIELD_2, SQL_ARG_2); + + reconnectClientNode(srv, cli, restartCache, new RunnableX() { + @Override public void run() throws Exception { + if (!restartCache) + queryProcessor(srv).dynamicIndexDrop(CACHE_NAME, IDX_NAME_2, false).get(); + + final QueryIndex idx = index(IDX_NAME_2, field(FIELD_NAME_1), field(alias(FIELD_NAME_2))); + + queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false); + } + }); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(FIELD_NAME_1), field(alias(FIELD_NAME_2))); + assertIndexUsed(IDX_NAME_2, SQL_COMPOSITE, SQL_ARG_1, SQL_ARG_2); + } + + /** + * Reconnect the client and run specified actions while it's out. + * + * @param srvNode Server node. + * @param cliNode Client node. + * @param restart Whether cache has to be recreated prior to executing required actions. + * @param clo Closure to run + * @throws Exception If failed. + */ + private void reconnectClientNode(final Ignite srvNode, final Ignite cliNode, final boolean restart, + final RunnableX clo) throws Exception { + IgniteClientReconnectAbstractTest.reconnectClientNode(log, cliNode, srvNode, new Runnable() { + @Override public void run() { + if (restart) { + srvNode.destroyCache(CACHE_NAME); + + srvNode.getOrCreateCache(cacheConfiguration().setName(CACHE_NAME)); + } + + try { + clo.run(); + } + catch (Exception e) { + throw new IgniteException("Test reconnect runnable failed.", e); + } + } + }); + + if (restart) + cliNode.cache(CACHE_NAME); + } + + /** * Test concurrent node start/stop along with index operations. Nothing should hang. * * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java index 95ad2f1..5a00345 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java @@ -17,8 +17,11 @@ package org.apache.ignite.internal.processors.cache.index; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; +import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -34,6 +37,9 @@ import org.apache.ignite.testframework.GridTestUtils; import java.util.Collections; import java.util.Map; +import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi; +import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.reconnectClientNode; + /** * Tests for schema exchange between nodes. */ @@ -396,11 +402,11 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest { } /** - * Test client reconnect. + * Test client reconnect after server restart accompanied by schema change. * * @throws Exception If failed. */ - public void testClientReconnect() throws Exception { + public void testServerRestartWithNewTypes() throws Exception { IgniteEx node1 = start(1, KeyClass.class, ValueClass.class); assertTypes(node1, ValueClass.class); @@ -412,11 +418,11 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest { stopGrid(1); - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { return grid(2).context().clientDisconnected(); } - }, 10_000L); + }, 10_000L)); IgniteFuture reconnFut = null; @@ -441,6 +447,40 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest { } /** + * Test client reconnect. + * + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testClientReconnect() throws Exception { + final IgniteEx node1 = start(1, KeyClass.class, ValueClass.class); + assertTypes(node1, ValueClass.class); + + final IgniteEx node2 = startClientNoCache(2); + assertTypes(node2); + + node2.cache(CACHE_NAME); + assertTypes(node2, ValueClass.class); + + reconnectClientNode(log, node2, node1, new Runnable() { + @Override public void run() { + assertTrue(node2.context().clientDisconnected()); + + final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + try { + queryProcessor(node1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + } + + /** * Ensure that only provided types exists for the given cache. * * @param node Node. @@ -449,15 +489,15 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest { private static void assertTypes(IgniteEx node, Class... clss) { Map<String, QueryTypeDescriptorImpl> types = types(node, CACHE_NAME); - if (clss == null || clss.length == 0) - assert types.isEmpty(); + if (F.isEmpty(clss)) + assertTrue(types.isEmpty()); else { assertEquals(clss.length, types.size()); for (Class cls : clss) { String tblName = tableName(cls); - assert types.containsKey(tblName); + assertTrue(types.containsKey(tblName)); } } } @@ -498,6 +538,7 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest { cfg.setClientMode(client); cfg.setLocalHost("127.0.0.1"); cfg.setCacheConfiguration(cacheConfiguration(clss)); + cfg.setDiscoverySpi(new TestTcpDiscoverySpi()); if (filterNodeName != null && F.eq(name, filterNodeName)) cfg.setUserAttributes(Collections.singletonMap("AFF_NODE", true)); @@ -543,6 +584,8 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest { cfg.setClientMode(client); cfg.setLocalHost("127.0.0.1"); + cfg.setDiscoverySpi(new TestTcpDiscoverySpi()); + return (IgniteEx)Ignition.start(cfg); }
