IGNITE-8048 Store dynamic indexes to cache data on node join - Fixes #3719.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbc439b8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbc439b8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbc439b8 Branch: refs/heads/ignite-7708 Commit: bbc439b892a145a0b50b7b5dfd8c989d9868a1e1 Parents: e5c3f89 Author: Anton Kalashnikov <kaa....@yandex.ru> Authored: Tue Apr 17 10:30:52 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Apr 17 10:30:52 2018 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cache/QueryEntity.java | 177 ++++++- .../apache/ignite/cache/QueryEntityPatch.java | 118 +++++ .../cache/CacheJoinNodeDiscoveryData.java | 15 +- .../processors/cache/ClusterCachesInfo.java | 428 +++++++++++---- .../cache/DynamicCacheDescriptor.java | 28 + .../processors/cache/GridCacheProcessor.java | 94 +++- .../cluster/GridClusterStateProcessor.java | 8 +- .../internal/processors/query/QueryField.java | 10 + .../internal/processors/query/QuerySchema.java | 84 ++- .../processors/query/QuerySchemaPatch.java | 96 ++++ ...erActivateDeactivateTestWithPersistence.java | 18 +- .../cache/IgniteDynamicSqlRestoreTest.java | 529 +++++++++++++++++++ ...ynamicColumnsAbstractConcurrentSelfTest.java | 3 +- .../IgniteCacheQuerySelfTestSuite.java | 2 + 14 files changed, 1481 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java index 976bd67..37a7f15 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java @@ -17,6 +17,7 @@ package org.apache.ignite.cache; +import javax.cache.CacheException; import java.io.Serializable; import java.lang.reflect.Field; import java.math.BigDecimal; @@ -27,23 +28,29 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import javax.cache.CacheException; +import java.util.UUID; import org.apache.ignite.cache.query.annotations.QueryGroupIndex; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QueryTextField; import org.apache.ignite.internal.processors.cache.query.QueryEntityClassProperty; import org.apache.ignite.internal.processors.cache.query.QueryEntityTypeDescriptor; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; +import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static java.util.Collections.unmodifiableMap; @@ -153,6 +160,172 @@ public class QueryEntity implements Serializable { } /** + * Make query entity patch. This patch can only add properties to entity and can't remove them. + * Other words, the patch will contain only add operations(e.g. add column, create index) and not remove ones. + * + * @param target Query entity to which this entity should be expanded. + * @return Patch which contains operations for expanding this entity. + */ + @NotNull public QueryEntityPatch makePatch(QueryEntity target) { + if (target == null) + return QueryEntityPatch.empty(); + + StringBuilder conflicts = new StringBuilder(); + + checkEquals(conflicts, "keyType", keyType, target.keyType); + checkEquals(conflicts, "valType", valType, target.valType); + checkEquals(conflicts, "keyFieldName", keyFieldName, target.keyFieldName); + checkEquals(conflicts, "valueFieldName", valueFieldName, target.valueFieldName); + checkEquals(conflicts, "tableName", tableName, target.tableName); + + List<QueryField> queryFieldsToAdd = checkFields(target, conflicts); + + Collection<QueryIndex> indexesToAdd = checkIndexes(target, conflicts); + + if (conflicts.length() != 0) + return QueryEntityPatch.conflict(tableName + " conflict: \n" + conflicts.toString()); + + Collection<SchemaAbstractOperation> patchOperations = new ArrayList<>(); + + if (!queryFieldsToAdd.isEmpty()) + patchOperations.add(new SchemaAlterTableAddColumnOperation( + UUID.randomUUID(), + null, + null, + tableName, + queryFieldsToAdd, + true, + true + )); + + if (!indexesToAdd.isEmpty()) { + for (QueryIndex index : indexesToAdd) { + patchOperations.add(new SchemaIndexCreateOperation( + UUID.randomUUID(), + null, + null, + tableName, + index, + true, + 0 + )); + } + } + + return QueryEntityPatch.patch(patchOperations); + } + + /** + * Comparing local fields and target fields. + * + * @param target Query entity for check. + * @param conflicts Storage of conflicts. + * @return Indexes which exist in target and not exist in local. + */ + @NotNull private Collection<QueryIndex> checkIndexes(QueryEntity target, StringBuilder conflicts) { + HashSet<QueryIndex> indexesToAdd = new HashSet<>(); + + Map<String, QueryIndex> currentIndexes = new HashMap<>(); + + for (QueryIndex index : getIndexes()) { + if (currentIndexes.put(index.getName(), index) != null) + throw new IllegalStateException("Duplicate key"); + } + + for (QueryIndex queryIndex : target.getIndexes()) { + if(currentIndexes.containsKey(queryIndex.getName())) { + checkEquals( + conflicts, + "index " + queryIndex.getName(), + currentIndexes.get(queryIndex.getName()), + queryIndex + ); + } + else + indexesToAdd.add(queryIndex); + } + return indexesToAdd; + } + + /** + * Comparing local entity fields and target entity fields. + * + * @param target Query entity for check. + * @param conflicts Storage of conflicts. + * @return Fields which exist in target and not exist in local. + */ + private List<QueryField> checkFields(QueryEntity target, StringBuilder conflicts) { + List<QueryField> queryFieldsToAdd = new ArrayList<>(); + + for (Map.Entry<String, String> targetField : target.getFields().entrySet()) { + String targetFieldName = targetField.getKey(); + String targetFieldType = targetField.getValue(); + + if (getFields().containsKey(targetFieldName)) { + checkEquals( + conflicts, + "fieldType of " + targetFieldName, + getFields().get(targetFieldName), + targetFieldType + ); + + checkEquals( + conflicts, + "nullable of " + targetFieldName, + contains(getNotNullFields(), targetFieldName), + contains(target.getNotNullFields(), targetFieldName) + ); + + checkEquals( + conflicts, + "default value of " + targetFieldName, + getFromMap(getDefaultFieldValues(), targetFieldName), + getFromMap(target.getDefaultFieldValues(), targetFieldName) + ); + } + else { + queryFieldsToAdd.add(new QueryField( + targetFieldName, + targetFieldType, + !contains(target.getNotNullFields(),targetFieldName), + getFromMap(target.getDefaultFieldValues(), targetFieldName) + )); + } + } + + return queryFieldsToAdd; + } + + /** + * @param collection Collection for checking. + * @param elementToCheck Element for checking to containing in collection. + * @return {@code true} if collection contain elementToCheck. + */ + private static boolean contains(Collection<String> collection, String elementToCheck) { + return collection != null && collection.contains(elementToCheck); + } + + /** + * @return Value from sourceMap or null if map is null. + */ + private static Object getFromMap(Map<String, Object> sourceMap, String key) { + return sourceMap == null ? null : sourceMap.get(key); + } + + /** + * Comparing two objects and add formatted text to conflicts if needed. + * + * @param conflicts Storage of conflicts resulting error message. + * @param name Name of comparing object. + * @param local Local object. + * @param received Received object. + */ + private void checkEquals(StringBuilder conflicts, String name, Object local, Object received) { + if (!Objects.equals(local, received)) + conflicts.append(String.format("%s is different: local=%s, received=%s\n", name, local, received)); + } + + /** * Gets key type for this query pair. * * @return Key type. @@ -319,7 +492,7 @@ public class QueryEntity implements Serializable { * * @return Collection of index entities. */ - public Collection<QueryIndex> getIndexes() { + @NotNull public Collection<QueryIndex> getIndexes() { return idxs == null ? Collections.<QueryIndex>emptyList() : idxs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/cache/QueryEntityPatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntityPatch.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntityPatch.java new file mode 100644 index 0000000..38e1b2a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntityPatch.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache; + +import java.util.Collection; +import java.util.Objects; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Query entity patch which contain {@link SchemaAbstractOperation} operations for changing query entity. + * This patch can only add properties to entity and can't remove them. + * Other words, the patch will contain only add operations + * (e.g.: + * {@link org.apache.ignite.internal.processors.query.schema.operation.SchemaAlterTableAddColumnOperation}, + * {@link org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation} + * ) and not remove ones. + * + * It contain only add operation because at the moment we don't have history of schema operations + * and by current state we can't understand some property was already deleted or it has not been added yet. + */ +public class QueryEntityPatch { + /** Empty query entity patch. */ + private static final QueryEntityPatch EMPTY_QUERY_ENTITY_PATCH = new QueryEntityPatch(null, null); + + /** Message which described conflicts during creating this patch. */ + private String conflictsMessage; + + /** Operations for modification query entity. */ + private Collection<SchemaAbstractOperation> patchOperations; + + /** + * Create patch. + */ + private QueryEntityPatch(String conflictsMessage, Collection<SchemaAbstractOperation> patchOperations) { + this.conflictsMessage = conflictsMessage; + this.patchOperations = patchOperations; + } + + /** + * Builder method for patch with conflicts. + * + * @param conflicts Conflicts. + * @return Query entity patch with conflicts. + */ + public static QueryEntityPatch conflict(String conflicts) { + return new QueryEntityPatch(conflicts, null); + } + + /** + * Builder method for empty patch. + * + * @return Query entity patch. + */ + public static QueryEntityPatch empty() { + return EMPTY_QUERY_ENTITY_PATCH; + } + + /** + * Builder method for patch with operations. + * + * @param patchOperations Operations for modification. + * @return Query entity patch which contain {@link SchemaAbstractOperation} operations for changing query entity. + */ + public static QueryEntityPatch patch(Collection<SchemaAbstractOperation> patchOperations) { + return new QueryEntityPatch(null, patchOperations); + } + + /** + * Check for conflict in this patch. + * + * @return {@code true} if patch has conflict. + */ + public boolean hasConflict() { + return conflictsMessage != null; + } + + /** + * @return {@code true} if patch is empty and can't be applying. + */ + public boolean isEmpty() { + return patchOperations == null || patchOperations.isEmpty(); + } + + /** + * @return Conflicts. + */ + public String getConflictsMessage() { + return conflictsMessage; + } + + /** + * @return Patch operations for applying. + */ + public Collection<SchemaAbstractOperation> getPatchOperations() { + return patchOperations; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryEntityPatch.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java index 6d2688c..a3902de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java @@ -112,17 +112,23 @@ public class CacheJoinNodeDiscoveryData implements Serializable { /** Flags added for future usage. */ private final long flags; + /** Statically configured flag */ + private final boolean staticallyConfigured; + /** * @param cacheData Cache data. * @param cacheType Cache type. * @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}. * @param flags Flags (for future usage). + * @param staticallyConfigured {@code true} if it was configured by static config and {@code false} otherwise. */ - public CacheInfo(StoredCacheData cacheData, CacheType cacheType, boolean sql, long flags) { + public CacheInfo(StoredCacheData cacheData, CacheType cacheType, boolean sql, long flags, + boolean staticallyConfigured) { this.cacheData = cacheData; this.cacheType = cacheType; this.sql = sql; this.flags = flags; + this.staticallyConfigured = staticallyConfigured; } /** @@ -146,6 +152,13 @@ public class CacheJoinNodeDiscoveryData implements Serializable { return sql; } + /** + * @return {@code true} if it was configured by static config and {@code false} otherwise. + */ + public boolean isStaticallyConfigured() { + return staticallyConfigured; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheInfo.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 2b2fb55..975617e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -35,31 +35,35 @@ import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheExistsException; +import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.GridCachePluginContext; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.processors.query.QuerySchemaPatch; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.lang.GridFunc; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.CachePluginContext; import org.apache.ignite.plugin.CachePluginProvider; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.discovery.DiscoveryDataBag; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; @@ -71,6 +75,9 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType * Logic related to cache discovery data processing. */ class ClusterCachesInfo { + /** Version since which merge of config is supports. */ + private static final IgniteProductVersion V_MERGE_CONFIG_SINCE = IgniteProductVersion.fromString("2.5.0"); + /** */ private final GridKernalContext ctx; @@ -987,54 +994,77 @@ class ClusterCachesInfo { // CacheGroup configurations that were created from local node configuration. Map<Integer, CacheGroupDescriptor> locCacheGrps = new HashMap<>(registeredCacheGroups()); - // Replace locally registered data with actual data received from cluster. - registeredCaches.clear(); - registeredCacheGrps.clear(); - ctx.discovery().cleanCachesAndGroups(); + //Replace locally registered data with actual data received from cluster. + cleanCachesAndGroups(); - for (CacheGroupData grpData : cachesData.cacheGroups().values()) { - CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( - grpData.config(), - grpData.groupName(), - grpData.groupId(), - grpData.receivedFrom(), - grpData.startTopologyVersion(), - grpData.deploymentId(), - grpData.caches(), - grpData.persistenceEnabled(), - grpData.walEnabled(), - grpData.walChangeRequests()); + registerReceivedCacheGroups(cachesData, locCacheGrps); - if (locCacheGrps.containsKey(grpDesc.groupId())) { - CacheGroupDescriptor locGrpCfg = locCacheGrps.get(grpDesc.groupId()); + registerReceivedCacheTemplates(cachesData); - grpDesc.mergeWith(locGrpCfg); - } + registerReceivedCaches(cachesData); - CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupId(), grpDesc); + addReceivedClientNodesToDiscovery(cachesData); - assert old == null : old; + String conflictErr = validateRegisteredCaches(); - ctx.discovery().addCacheGroup(grpDesc, - grpData.config().getNodeFilter(), - grpData.config().getCacheMode()); + gridData = new GridData(joinDiscoData, cachesData, conflictErr); + + if (cachesOnDisconnect == null || cachesOnDisconnect.clusterActive()) + initStartCachesForLocalJoin(false, disconnectedState()); + } + + /** + * Validation {@link #registeredCaches} on conflicts. + * + * @return Error message if conflicts was found. + */ + @Nullable private String validateRegisteredCaches() { + String conflictErr = null; + + if (joinDiscoData != null) { + for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) { + if (!registeredCaches.containsKey(e.getKey())) { + conflictErr = checkCacheConflict(e.getValue().cacheData().config()); + + if (conflictErr != null) { + conflictErr = "Failed to start configured cache due to conflict with started caches. " + + conflictErr; + + break; + } + } + } } - for (CacheData cacheData : cachesData.templates().values()) { - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - cacheData.cacheConfiguration(), - cacheData.cacheType(), - null, - true, - cacheData.receivedFrom(), - cacheData.staticallyConfigured(), - false, - cacheData.deploymentId(), - cacheData.schema()); + return conflictErr; + } - registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc); + /** + * Adding received client nodes to discovery if needed. + * + * @param cachesData Data received from cluster. + */ + private void addReceivedClientNodesToDiscovery(CacheNodeCommonDiscoveryData cachesData) { + if (!F.isEmpty(cachesData.clientNodesMap())) { + for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) { + String cacheName = entry.getKey(); + + for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet()) + ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); + } } + } + + /** + * Register caches received from cluster. + * + * @param cachesData Data received from cluster. + */ + private void registerReceivedCaches(CacheNodeCommonDiscoveryData cachesData) { + Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply = new HashMap<>(); + Collection<DynamicCacheDescriptor> cachesToSave = new HashSet<>(); + + boolean hasSchemaPatchConflict = false; for (CacheData cacheData : cachesData.caches().values()) { CacheGroupDescriptor grpDesc = registeredCacheGrps.get(cacheData.groupId()); @@ -1053,7 +1083,22 @@ class ClusterCachesInfo { cacheData.staticallyConfigured(), cacheData.sql(), cacheData.deploymentId(), - cacheData.schema()); + new QuerySchema(cacheData.schema().entities()) + ); + + Collection<QueryEntity> localQueryEntities = getLocalQueryEntities(cfg.getName()); + + QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(localQueryEntities); + + if (schemaPatch.hasConflicts()) { + hasSchemaPatchConflict = true; + + log.warning("Skipping apply patch because conflicts : " + schemaPatch.getConflictsMessage()); + } + else if (!schemaPatch.isEmpty()) + patchesToApply.put(desc, schemaPatch); + else if (!GridFunc.eqNotOrdered(desc.schema().entities(), localQueryEntities)) + cachesToSave.add(desc); //received config is different of local config - need to resave desc.receivedOnDiscovery(true); @@ -1066,36 +1111,140 @@ class ClusterCachesInfo { cfg.getNearConfiguration() != null); } - if (!F.isEmpty(cachesData.clientNodesMap())) { - for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) { - String cacheName = entry.getKey(); + updateRegisteredCachesIfNeeded(patchesToApply, cachesToSave, hasSchemaPatchConflict); + } - for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet()) - ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); + /** + * Merging config or resaving it if it needed. + * + * @param patchesToApply Patches which need to apply. + * @param cachesToSave Caches which need to resave. + * @param hasSchemaPatchConflict {@code true} if we have conflict during making patch. + */ + private void updateRegisteredCachesIfNeeded(Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply, + Collection<DynamicCacheDescriptor> cachesToSave, boolean hasSchemaPatchConflict) { + //Skip merge of config if least one conflict was found. + if (!hasSchemaPatchConflict && isMergeConfigSupports(ctx.discovery().localNode())) { + boolean isClusterActive = ctx.state().clusterState().active(); + + //Merge of config for cluster only for inactive grid. + if (!isClusterActive && !patchesToApply.isEmpty()) { + for (Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry : patchesToApply.entrySet()) { + if (entry.getKey().applySchemaPatch(entry.getValue())) + saveCacheConfiguration(entry.getKey()); + } + + for (DynamicCacheDescriptor descriptor : cachesToSave) { + saveCacheConfiguration(descriptor); + } + } + else if (patchesToApply.isEmpty()) { + for (DynamicCacheDescriptor descriptor : cachesToSave) { + saveCacheConfiguration(descriptor); + } } } + } - String conflictErr = null; + /** + * Register cache templates received from cluster. + * + * @param cachesData Data received from cluster. + */ + private void registerReceivedCacheTemplates(CacheNodeCommonDiscoveryData cachesData) { + for (CacheData cacheData : cachesData.templates().values()) { + DynamicCacheDescriptor desc = new DynamicCacheDescriptor( + ctx, + cacheData.cacheConfiguration(), + cacheData.cacheType(), + null, + true, + cacheData.receivedFrom(), + cacheData.staticallyConfigured(), + false, + cacheData.deploymentId(), + cacheData.schema()); - if (joinDiscoData != null) { - for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) { - if (!registeredCaches.containsKey(e.getKey())) { - conflictErr = checkCacheConflict(e.getValue().cacheData().config()); + registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc); + } + } - if (conflictErr != null) { - conflictErr = "Failed to start configured cache due to conflict with started caches. " + - conflictErr; + /** + * Register cache groups received from cluster. + * + * @param cachesData Data received from cluster. + * @param locCacheGrps Current local cache groups. + */ + private void registerReceivedCacheGroups(CacheNodeCommonDiscoveryData cachesData, + Map<Integer, CacheGroupDescriptor> locCacheGrps) { + for (CacheGroupData grpData : cachesData.cacheGroups().values()) { + CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( + grpData.config(), + grpData.groupName(), + grpData.groupId(), + grpData.receivedFrom(), + grpData.startTopologyVersion(), + grpData.deploymentId(), + grpData.caches(), + grpData.persistenceEnabled(), + grpData.walEnabled(), + grpData.walChangeRequests()); - break; - } - } + if (locCacheGrps.containsKey(grpDesc.groupId())) { + CacheGroupDescriptor locGrpCfg = locCacheGrps.get(grpDesc.groupId()); + + grpDesc.mergeWith(locGrpCfg); } + + CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupId(), grpDesc); + + assert old == null : old; + + ctx.discovery().addCacheGroup(grpDesc, + grpData.config().getNodeFilter(), + grpData.config().getCacheMode()); } + } - gridData = new GridData(joinDiscoData, cachesData, conflictErr); + /** + * Clean local registered caches and groups + */ + private void cleanCachesAndGroups() { + registeredCaches.clear(); + registeredCacheGrps.clear(); + ctx.discovery().cleanCachesAndGroups(); + } - if (cachesOnDisconnect == null || cachesOnDisconnect.clusterActive()) - initStartCachesForLocalJoin(false, disconnectedState()); + /** + * Save dynamic cache descriptor on disk. + * + * @param desc Cache to save. + */ + private void saveCacheConfiguration(DynamicCacheDescriptor desc) { + try { + ctx.cache().saveCacheConfiguration(desc); + } + catch (IgniteCheckedException e) { + log.error("Error while saving cache configuration to disk, cfg = " + desc.cacheConfiguration(), e); + } + } + + /** + * Get started node query entities by cacheName. + * + * @param cacheName Cache for which query entities will be returned. + * @return Local query entities. + */ + private Collection<QueryEntity> getLocalQueryEntities(String cacheName) { + if (joinDiscoData == null) + return Collections.emptyList(); + + CacheJoinNodeDiscoveryData.CacheInfo cacheInfo = joinDiscoData.caches().get(cacheName); + + if (cacheInfo == null) + return Collections.emptyList(); + + return cacheInfo.cacheData().queryEntities(); } /** @@ -1144,7 +1293,7 @@ class ClusterCachesInfo { desc.staticallyConfigured(), desc.sql(), desc.deploymentId(), - new QuerySchema(locCfg.cacheData().queryEntities())); + desc.schema().copy()); desc0.startTopologyVersion(desc.startTopologyVersion()); desc0.receivedFromStartVersion(desc.receivedFromStartVersion()); @@ -1385,26 +1534,14 @@ class ClusterCachesInfo { * @return Configuration conflict error. */ private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, boolean locJoin) { - for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { - CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config(); + registerNewCacheTemplates(joinData, nodeId); - if (!registeredTemplates.containsKey(cfg.getName())) { - DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, - cfg, - cacheInfo.cacheType(), - null, - true, - nodeId, - true, - false, - joinData.cacheDeploymentId(), - new QuerySchema(cacheInfo.cacheData().queryEntities())); + Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply = new HashMap<>(); - DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc); + boolean hasSchemaPatchConflict = false; + boolean active = ctx.state().clusterState().active(); - assert old == null : old; - } - } + boolean isMergeConfigSupport = isMergeConfigSupports(null); for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) { CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config(); @@ -1421,49 +1558,138 @@ class ClusterCachesInfo { continue; } - int cacheId = CU.cacheId(cfg.getName()); + registerNewCache(joinData, nodeId, cacheInfo); + } + else if (!active && isMergeConfigSupport) { + DynamicCacheDescriptor desc = registeredCaches.get(cfg.getName()); + + QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(cacheInfo.cacheData().queryEntities()); - CacheGroupDescriptor grpDesc = registerCacheGroup(null, - null, - cfg, - cacheId, + if (schemaPatch.hasConflicts()) { + hasSchemaPatchConflict = true; + + log.error("Error during making schema patch : " + schemaPatch.getConflictsMessage()); + } + else if (!schemaPatch.isEmpty() && !hasSchemaPatchConflict) + patchesToApply.put(desc, schemaPatch); + } + + ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null); + } + + //If conflict was detected we don't merge config and we leave existed config. + if (!hasSchemaPatchConflict && !patchesToApply.isEmpty()) + for(Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry: patchesToApply.entrySet()){ + if (entry.getKey().applySchemaPatch(entry.getValue())) + saveCacheConfiguration(entry.getKey()); + } + + if (joinData.startCaches()) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + ctx.discovery().addClientNode(desc.cacheName(), nodeId, - joinData.cacheDeploymentId()); + desc.cacheConfiguration().getNearConfiguration() != null); + } + } + + return null; + } + + /** + * Register new cache received from joining node. + * + * @param joinData Data from joining node. + * @param nodeId Joining node id. + * @param cacheInfo Cache info of new node. + */ + private void registerNewCache( + CacheJoinNodeDiscoveryData joinData, + UUID nodeId, + CacheJoinNodeDiscoveryData.CacheInfo cacheInfo) { + CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config(); + + int cacheId = CU.cacheId(cfg.getName()); + + CacheGroupDescriptor grpDesc = registerCacheGroup(null, + null, + cfg, + cacheId, + nodeId, + joinData.cacheDeploymentId()); + + ctx.discovery().setCacheFilter( + cacheId, + grpDesc.groupId(), + cfg.getName(), + cfg.getNearConfiguration() != null); + + DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, + cfg, + cacheInfo.cacheType(), + grpDesc, + false, + nodeId, + cacheInfo.isStaticallyConfigured(), + cacheInfo.sql(), + joinData.cacheDeploymentId(), + new QuerySchema(cacheInfo.cacheData().queryEntities())); + + DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc); - ctx.discovery().setCacheFilter( - cacheId, - grpDesc.groupId(), - cfg.getName(), - cfg.getNearConfiguration() != null); + assert old == null : old; + } + + /** + * Register new cache templates received from joining node. + * + * @param joinData Data from joining node. + * @param nodeId Joining node id. + */ + private void registerNewCacheTemplates(CacheJoinNodeDiscoveryData joinData, UUID nodeId) { + for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { + CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config(); + if (!registeredTemplates.containsKey(cfg.getName())) { DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheInfo.cacheType(), - grpDesc, - false, + null, + true, nodeId, true, - cacheInfo.sql(), + false, joinData.cacheDeploymentId(), new QuerySchema(cacheInfo.cacheData().queryEntities())); - DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc); + DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc); assert old == null : old; } - - ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null); } + } - if (joinData.startCaches()) { - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - ctx.discovery().addClientNode(desc.cacheName(), - nodeId, - desc.cacheConfiguration().getNearConfiguration() != null); - } + /** + * @return {@code true} if grid supports merge of config and {@code False} otherwise. + */ + public boolean isMergeConfigSupports(ClusterNode joiningNode) { + DiscoCache discoCache = ctx.discovery().discoCache(); + + if (discoCache == null) + return true; + + if (joiningNode != null && joiningNode.version().compareToIgnoreTimestamp(V_MERGE_CONFIG_SINCE) < 0) + return false; + + Collection<ClusterNode> nodes = discoCache.allNodes(); + + for (ClusterNode node : nodes) { + IgniteProductVersion version = node.version(); + + if (version.compareToIgnoreTimestamp(V_MERGE_CONFIG_SINCE) < 0) + return false; } - return null; + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index cad8414..93882a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -17,14 +17,17 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Collection; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.processors.query.QuerySchemaPatch; import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -346,6 +349,31 @@ public class DynamicCacheDescriptor { } /** + * Make schema patch for this cache. + * + * @param target Query entity list which current schema should be expanded to. + * @return Patch which contains operations for expanding schema of this cache. + * @see QuerySchemaPatch + */ + public QuerySchemaPatch makeSchemaPatch(Collection<QueryEntity> target) { + synchronized (schemaMux) { + return schema.makePatch(target); + } + } + + /** + * Apply query schema patch for changing current schema. + * + * @param patch patch to apply. + * @return {@code true} if applying was success and {@code false} otherwise. + */ + public boolean applySchemaPatch(QuerySchemaPatch patch) { + synchronized (schemaMux) { + return schema.applyPatch(patch); + } + } + + /** * Form a {@link StoredCacheData} with all data to correctly restore cache params when its configuration is read * from page store. Essentially, this method takes from {@link DynamicCacheDescriptor} all that's needed to start * cache correctly, leaving out everything else. http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 3aa6603..36edd72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -117,6 +117,7 @@ import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.plugin.CachePluginManager; import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.processors.query.QuerySchemaPatch; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask; import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask; @@ -183,6 +184,14 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearE */ @SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"}) public class GridCacheProcessor extends GridProcessorAdapter { + /** Template of message of conflicts during configuration merge*/ + private static final String MERGE_OF_CONFIG_CONFLICTS_MESSAGE = + "Conflicts during configuration merge for cache '%s' : \n%s"; + + /** Template of message of node join was fail because it requires to merge of config */ + private static final String MERGE_OF_CONFIG_REQUIRED_MESSAGE = "Failed to join node to the active cluster " + + "(the config of the cache '%s' has to be merged which is impossible on active grid). " + + "Deactivate grid and retry node join or clean the joining node."; /** */ private final boolean startClientCaches = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false); @@ -742,15 +751,29 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cacheType != CacheType.USER && cfg.getDataRegionName() == null) cfg.setDataRegionName(sharedCtx.database().systemDateRegionName()); - if (!cacheType.userCache()) - stopSeq.addLast(cacheName); - else - stopSeq.addFirst(cacheName); - - caches.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, cacheType, cacheData.sql(), 0)); + addStoredCache(caches, cacheData, cacheName, cacheType, true); } else - templates.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, CacheType.USER, false, 0)); + templates.put(cacheName, new CacheInfo(cacheData, CacheType.USER, false, 0, true)); + } + + /** + * Add stored cache data to caches storage. + * + * @param caches Cache storage. + * @param cacheData Cache data to add. + * @param cacheName Cache name. + * @param cacheType Cache type. + * @param isStaticalyConfigured Statically configured flag. + */ + private void addStoredCache(Map<String, CacheInfo> caches, StoredCacheData cacheData, String cacheName, + CacheType cacheType, boolean isStaticalyConfigured) { + if (!cacheType.userCache()) + stopSeq.addLast(cacheName); + else + stopSeq.addFirst(cacheName); + + caches.put(cacheName, new CacheInfo(cacheData, cacheType, cacheData.sql(), 0, isStaticalyConfigured)); } /** @@ -774,6 +797,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { addCacheOnJoin(cfg, false, caches, templates); } + + if (CU.isPersistenceEnabled(ctx.config()) && ctx.cache().context().pageStore() != null) { + Map<String, StoredCacheData> storedCaches = ctx.cache().context().pageStore().readCacheConfigurations(); + + if (!F.isEmpty(storedCaches)) + for (StoredCacheData storedCacheData : storedCaches.values()) { + String cacheName = storedCacheData.config().getName(); + + //Ignore stored caches if it already added by static config(static config has higher priority). + if (!caches.containsKey(cacheName)) + addStoredCache(caches, storedCacheData, cacheName, cacheType(cacheName), false); + } + } } /** @@ -2439,6 +2475,50 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.walState().onCachesInfoCollected(); } + /** {@inheritDoc} */ + @Nullable @Override public IgniteNodeValidationResult validateNode( + ClusterNode node, JoiningNodeDiscoveryData discoData + ) { + if(!cachesInfo.isMergeConfigSupports(node)) + return null; + + if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) { + CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData(); + + boolean isGridActive = ctx.state().clusterState().active(); + + StringBuilder errorMessage = new StringBuilder(); + + for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : nodeData.caches().values()) { + DynamicCacheDescriptor localDesc = cacheDescriptor(cacheInfo.cacheData().config().getName()); + + if (localDesc == null) + continue; + + QuerySchemaPatch schemaPatch = localDesc.makeSchemaPatch(cacheInfo.cacheData().queryEntities()); + + if (schemaPatch.hasConflicts() || (isGridActive && !schemaPatch.isEmpty())) { + if (errorMessage.length() > 0) + errorMessage.append("\n"); + + if (schemaPatch.hasConflicts()) + errorMessage.append(String.format(MERGE_OF_CONFIG_CONFLICTS_MESSAGE, + localDesc.cacheName(), schemaPatch.getConflictsMessage())); + else + errorMessage.append(String.format(MERGE_OF_CONFIG_REQUIRED_MESSAGE, localDesc.cacheName())); + } + } + + if (errorMessage.length() > 0) { + String msg = errorMessage.toString(); + + return new IgniteNodeValidationResult(node.id(), msg, msg); + } + } + + return null; + } + /** * @param msg Message. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 81a5b4e..2700a20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -1035,7 +1035,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I ", client=" + ctx.clientNode() + ", daemon" + ctx.isDaemon() + "]"); } - IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute(); + + ClusterGroupAdapter clusterGroupAdapter = (ClusterGroupAdapter)ctx.cluster().get().forServers(); + + if (F.isEmpty(clusterGroupAdapter.nodes())) + return false; + + IgniteCompute comp = clusterGroupAdapter.compute(); return comp.call(new IgniteCallable<Boolean>() { @IgniteInstanceResource http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java index 882d816..d68a6cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java @@ -61,6 +61,16 @@ public class QueryField implements Serializable { * @param nullable Nullable flag. * @param dfltValue Default value. */ + public QueryField(String name, String typeName, boolean nullable, Object dfltValue) { + this(name, typeName, nullable, dfltValue, -1, -1); + } + + /** + * @param name Field name. + * @param typeName Class name for this field's values. + * @param nullable Nullable flag. + * @param dfltValue Default value. + */ public QueryField(String name, String typeName, boolean nullable, Object dfltValue, int precision, int scale) { this.name = name; this.typeName = typeName; http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java index 5cbae29..569a02e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java @@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.query; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; - import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryEntityPatch; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; @@ -85,14 +87,90 @@ public class QuerySchema implements Serializable { } /** + * Make query schema patch. + * + * @param target Query entity list to which current schema should be expanded. + * @return Patch to achieve entity which is a result of merging current one and target. + * @see QuerySchemaPatch + */ + public QuerySchemaPatch makePatch(Collection<QueryEntity> target) { + synchronized (mux) { + Map<String, QueryEntity> localEntities = new HashMap<>(); + + for (QueryEntity entity : entities) { + if (localEntities.put(entity.getTableName(), entity) != null) + throw new IllegalStateException("Duplicate key"); + } + + Collection<SchemaAbstractOperation> patchOperations = new ArrayList<>(); + Collection<QueryEntity> entityToAdd = new ArrayList<>(); + + StringBuilder conflicts = new StringBuilder(); + + for (QueryEntity queryEntity : target) { + if (localEntities.containsKey(queryEntity.getTableName())) { + QueryEntity localEntity = localEntities.get(queryEntity.getTableName()); + + QueryEntityPatch entityPatch = localEntity.makePatch(queryEntity); + + if (entityPatch.hasConflict()) { + if (conflicts.length() > 0) + conflicts.append("\n"); + + conflicts.append(entityPatch.getConflictsMessage()); + } + + if (!entityPatch.isEmpty()) + patchOperations.addAll(entityPatch.getPatchOperations()); + } + else + entityToAdd.add(QueryUtils.copy(queryEntity)); + } + + return new QuerySchemaPatch(patchOperations, entityToAdd, conflicts.toString()); + } + } + + /** + * Apply query schema patch for changing this schema. + * + * @param patch Patch to apply. + * @return {@code true} if applying was success and {@code false} otherwise. + */ + public boolean applyPatch(QuerySchemaPatch patch) { + synchronized (mux) { + if (patch.hasConflicts()) + return false; + + if (patch.isEmpty()) + return true; + + for (SchemaAbstractOperation operation : patch.getPatchOperations()) { + finish(operation); + } + + entities.addAll(patch.getEntityToAdd()); + + return true; + } + } + + /** * Process finish message. * * @param msg Message. */ public void finish(SchemaFinishDiscoveryMessage msg) { - synchronized (mux) { - SchemaAbstractOperation op = msg.operation(); + finish(msg.operation()); + } + /** + * Process operation. + * + * @param op Operation for handle. + */ + public void finish(SchemaAbstractOperation op) { + synchronized (mux) { if (op instanceof SchemaIndexCreateOperation) { SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchemaPatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchemaPatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchemaPatch.java new file mode 100644 index 0000000..68beb04 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchemaPatch.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Collection; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryEntityPatch; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * Query schema patch which contains {@link SchemaAbstractOperation} operations for changing query entities. + * This patch is high level path on {@link org.apache.ignite.cache.QueryEntityPatch} but + * it has operations for all {@link QueryEntity} in schema + * and also contains {@link QueryEntity} for adding to schema by whole. + * + * @see org.apache.ignite.cache.QueryEntityPatch + */ +public class QuerySchemaPatch { + /** Message which described conflicts during creating this patch. */ + private String conflictsMessage; + + /** Operations for modification query entity. */ + private Collection<SchemaAbstractOperation> patchOperations; + + /** Entities which should be added by whole. */ + private Collection<QueryEntity> entityToAdd; + + /** + * Create patch. + */ + public QuerySchemaPatch( + @NotNull Collection<SchemaAbstractOperation> patchOperations, + @NotNull Collection<QueryEntity> entityToAdd, + String conflictsMessage) { + this.patchOperations = patchOperations; + this.entityToAdd = entityToAdd; + this.conflictsMessage = conflictsMessage; + } + + /** + * @return {@code true} if patch has conflict. + */ + public boolean hasConflicts() { + return conflictsMessage != null && !conflictsMessage.isEmpty(); + } + + /** + * @return Conflicts message. + */ + public String getConflictsMessage() { + return conflictsMessage; + } + + /** + * @return {@code true} if patch is empty and can't be applying. + */ + public boolean isEmpty() { + return patchOperations.isEmpty() && entityToAdd.isEmpty(); + } + + /** + * @return Patch operations for applying. + */ + @NotNull public Collection<SchemaAbstractOperation> getPatchOperations() { + return patchOperations; + } + + /** + * @return Entities which should be added by whole. + */ + @NotNull public Collection<QueryEntity> getEntityToAdd() { + return entityToAdd; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QuerySchemaPatch.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java index 58511ee..8bae136 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTestWithPersistence.java @@ -23,12 +23,13 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.testframework.GridTestUtils; @@ -253,22 +254,15 @@ public class IgniteClusterActivateDeactivateTestWithPersistence extends IgniteCl ccfg.setGroupName(DEFAULT_CACHE_NAME); - ccfgs = new CacheConfiguration[]{ccfg}; - - startGrids(SRVS); + ccfgs = new CacheConfiguration[] {ccfg}; try { - ignite(0).active(true); + startGrids(SRVS); fail(); } - catch (IgniteException e) { - // Expected error. + catch (IgniteCheckedException e) { + assertTrue(X.getCause(e).getMessage().contains("Failed to start configured cache.")); } - - for (int i = 0; i < SRVS; i++) - assertFalse(ignite(i).active()); - - checkNoCaches(SRVS); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicSqlRestoreTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicSqlRestoreTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicSqlRestoreTest.java new file mode 100644 index 0000000..f7dc7b4 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicSqlRestoreTest.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * + */ +public class IgniteDynamicSqlRestoreTest extends GridCommonAbstractTest implements Serializable { + + public static final String TEST_CACHE_NAME = "test"; + public static final String TEST_INDEX_OBJECT = "TestIndexObject"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setAutoActivationEnabled(false); + + DataStorageConfiguration memCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setMaxSize(200 * 1024 * 1024).setPersistenceEnabled(true)) + .setWalMode(WALMode.LOG_ONLY); + + cfg.setDataStorageConfiguration(memCfg); + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception if failed. + */ + public void testMergeChangedConfigOnCoordinator() throws Exception { + { + //given: two started nodes with test table + Ignite ig = startGrid(0); + startGrid(1); + + ig.cluster().active(true); + + IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration()); + + fillTestData(ig); + + //when: stop one node and create indexes on other node + stopGrid(1); + + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll(); + cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll(); + cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (c int)")).getAll(); + + //and: stop all grid + stopAllGrids(); + } + + { + //and: start cluster from node without index + IgniteEx ig = startGrid(1); + startGrid(0); + + ig.cluster().active(true); + + //and: change data + try (IgniteDataStreamer<Object, Object> s = ig.dataStreamer(TEST_CACHE_NAME)) { + s.allowOverwrite(true); + for (int i = 0; i < 5_000; i++) + s.addData(i, null); + } + + stopAllGrids(); + } + + { + //when: start node from first node + IgniteEx ig0 = startGrid(0); + IgniteEx ig1 = startGrid(1); + + ig0.cluster().active(true); + + //then: everything is ok + try (IgniteDataStreamer<Object, Object> s = ig1.dataStreamer(TEST_CACHE_NAME)) { + s.allowOverwrite(true); + for (int i = 0; i < 50_000; i++) { + BinaryObject bo = ig1.binary().builder(TEST_INDEX_OBJECT) + .setField("a", i, Object.class) + .setField("b", String.valueOf(i), Object.class) + .setField("c", i, Object.class) + .build(); + + s.addData(i, bo); + } + } + + IgniteCache<Object, Object> cache = ig1.cache(TEST_CACHE_NAME); + + assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa")); + assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b,c FROM TestIndexObject limit 1")).getAll().isEmpty()); + } + } + + /** + * @throws Exception if failed. + */ + public void testTakeConfigFromJoiningNodeOnInactiveGrid() throws Exception { + { + //given: two started nodes with test table + Ignite ig = startGrid(0); + startGrid(1); + + ig.cluster().active(true); + + IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration()); + + fillTestData(ig); + + stopGrid(1); + + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll(); + cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll(); + cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (c int)")).getAll(); + + stopAllGrids(); + } + + { + //and: start cluster from node without cache + IgniteEx ig = startGrid(1); + startGrid(0); + + ig.cluster().active(true); + + //then: config for cache was applying successful + IgniteCache<Object, Object> cache = ig.cache(TEST_CACHE_NAME); + + assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa")); + assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b,c FROM TestIndexObject limit 1")).getAll().isEmpty()); + } + } + + /** + * @throws Exception if failed. + */ + public void testResaveConfigAfterMerge() throws Exception { + { + //given: two started nodes with test table + Ignite ig = startGrid(0); + startGrid(1); + + ig.cluster().active(true); + + IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration()); + + fillTestData(ig); + + stopGrid(1); + + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll(); + cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll(); + cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (c int)")).getAll(); + + stopAllGrids(); + } + + { + //when: start cluster from node without cache + IgniteEx ig = startGrid(1); + startGrid(0); + + ig.cluster().active(true); + + stopAllGrids(); + } + + { + //then: start only one node which originally was without index + IgniteEx ig = startGrid(1); + + ig.cluster().active(true); + + IgniteCache<Object, Object> cache = ig.cache(TEST_CACHE_NAME); + + assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa")); + assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b,c FROM TestIndexObject limit 1")).getAll().isEmpty()); + } + } + + /** + * @throws Exception if failed. + */ + public void testMergeChangedConfigOnInactiveGrid() throws Exception { + { + //given: two started nodes with test table + Ignite ig = startGrid(0); + startGrid(1); + + ig.cluster().active(true); + LinkedHashMap<String, String> fields = new LinkedHashMap<>(); + fields.put("A", "java.lang.Integer"); + fields.put("B", "java.lang.String"); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(TEST_CACHE_NAME); + + ccfg.setQueryEntities(Arrays.asList( + new QueryEntity() + .setKeyType("java.lang.Integer") + .setValueType("TestIndexObject") + .setFields(fields) + )); + + IgniteCache cache = ig.getOrCreateCache(ccfg); + + fillTestData(ig); + + cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll(); + + //and: stop one node and create index on other node + stopGrid(1); + + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll(); + cache.query(new SqlFieldsQuery("drop index myindexb")).getAll(); + cache.query(new SqlFieldsQuery("alter table TestIndexObject drop column b")).getAll(); + + //and: stop all grid + stopAllGrids(); + } + + { + //and: start cluster + IgniteEx ig0 = startGrid(0); + IgniteEx ig1 = startGrid(1); + + ig0.cluster().active(true); + + //then: config should be merged + try (IgniteDataStreamer<Object, Object> s = ig1.dataStreamer(TEST_CACHE_NAME)) { + s.allowOverwrite(true); + for (int i = 0; i < 5_000; i++) { + BinaryObject bo = ig1.binary().builder("TestIndexObject") + .setField("a", i, Object.class) + .setField("b", String.valueOf(i), Object.class) + .build(); + + s.addData(i, bo); + } + } + IgniteCache<Object, Object> cache = ig1.cache(TEST_CACHE_NAME); + + //then: index "myindexa" and column "b" restored from node "1" + assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa")); + assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where b > 5"), containsString("myindexb")); + assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b FROM TestIndexObject limit 1")).getAll().isEmpty()); + } + } + + /** + * @throws Exception if failed. + */ + public void testTakeChangedConfigOnActiveGrid() throws Exception { + { + //given: two started nodes with test table + Ignite ig = startGrid(0); + startGrid(1); + + ig.cluster().active(true); + + IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration()); + + fillTestData(ig); + + //stop one node and create index on other node + stopGrid(1); + + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll(); + cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll(); + cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (c int)")).getAll(); + + stopAllGrids(); + } + + { + //and: start cluster + IgniteEx ig = startGrid(0); + ig.cluster().active(true); + + ig = startGrid(1); + + //then: config should be merged + try (IgniteDataStreamer<Object, Object> s = ig.dataStreamer(TEST_CACHE_NAME)) { + s.allowOverwrite(true); + for (int i = 0; i < 5_000; i++) { + BinaryObject bo = ig.binary().builder("TestIndexObject") + .setField("a", i, Object.class) + .setField("b", String.valueOf(i), Object.class) + .setField("c", i, Object.class) + .build(); + + s.addData(i, bo); + } + } + IgniteCache<Object, Object> cache = ig.getOrCreateCache(TEST_CACHE_NAME); + + cache.indexReadyFuture().get(); + + assertThat(doExplainPlan(cache, "explain select * from TestIndexObject where a > 5"), containsString("myindexa")); + assertFalse(cache.query(new SqlFieldsQuery("SELECT a,b,c FROM TestIndexObject limit 1")).getAll().isEmpty()); + } + } + + /** + * @throws Exception if failed. + */ + public void testFailJoiningNodeBecauseDifferentSql() throws Exception { + { + //given: two started nodes with test table + Ignite ig = startGrid(0); + startGrid(1); + + ig.cluster().active(true); + + IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration()); + + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll(); + + //stop one node and create index on other node + stopGrid(1); + + cache.query(new SqlFieldsQuery("drop index myindexa")).getAll(); + cache.query(new SqlFieldsQuery("alter table TestIndexObject drop column b")).getAll(); + cache.query(new SqlFieldsQuery("alter table TestIndexObject add column (b int)")).getAll(); + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(b)")).getAll(); + + //and: stopped all grid + stopAllGrids(); + } + + { + //and: start cluster + startGrid(0); + try { + startGrid(1); + + fail("Node should start with fail"); + } + catch (Exception e) { + String cause = X.cause(e, IgniteSpiException.class).getMessage(); + assertThat(cause, containsString("fieldType of B is different")); + assertThat(cause, containsString("index MYINDEXA is different")); + } + } + + } + + /** + * @throws Exception if failed. + */ + public void testFailJoiningNodeBecauseFieldInlineSizeIsDifferent() throws Exception { + { + //given: two started nodes with test table + Ignite ig = startGrid(0); + startGrid(1); + + ig.cluster().active(true); + + IgniteCache cache = ig.getOrCreateCache(getTestTableConfiguration()); + + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a) INLINE_SIZE 1000")).getAll(); + + //stop one node and create index on other node + stopGrid(1); + + cache.query(new SqlFieldsQuery("drop index myindexa")).getAll(); + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a) INLINE_SIZE 2000")).getAll(); + + //and: stopped all grid + stopAllGrids(); + } + + { + //and: start cluster + startGrid(0); + try { + startGrid(1); + + fail("Node should start with fail"); + } + catch (Exception e) { + assertThat(X.cause(e, IgniteSpiException.class).getMessage(), containsString("index MYINDEXA is different")); + } + } + + } + + /** + * @throws Exception if failed. + */ + public void testFailJoiningNodeBecauseNeedConfigUpdateOnActiveGrid() throws Exception { + { + startGrid(0); + startGrid(1); + + CacheConfiguration<Object, Object> ccfg = getTestTableConfiguration(); + + Ignite ig = ignite(0); + + ig.cluster().active(true); + + IgniteCache cache = ig.getOrCreateCache(ccfg); + + fillTestData(ig); + + stopGrid(1); + + cache.query(new SqlFieldsQuery("create index myindexa on TestIndexObject(a)")).getAll(); + cache.query(new SqlFieldsQuery("create index myindexb on TestIndexObject(b)")).getAll(); + + stopGrid(0); + } + + { + IgniteEx ig = startGrid(1); + ig.cluster().active(true); + + try { + startGrid(0); + + fail("Node should start with fail"); + } + catch (Exception e) { + assertThat(X.cause(e, IgniteSpiException.class).getMessage(), containsString("Failed to join node to the active cluster")); + } + } + } + + /** + * @return result of explain plan + */ + @NotNull private String doExplainPlan(IgniteCache<Object, Object> cache, String sql) { + return cache.query(new SqlFieldsQuery(sql)).getAll().get(0).get(0).toString().toLowerCase(); + } + + /** + * fill data by default + */ + private void fillTestData(Ignite ig) { + try (IgniteDataStreamer<Object, Object> s = ig.dataStreamer(TEST_CACHE_NAME)) { + for (int i = 0; i < 50_000; i++) { + BinaryObject bo = ig.binary().builder("TestIndexObject") + .setField("a", i, Object.class) + .setField("b", String.valueOf(i), Object.class) + .build(); + + s.addData(i, bo); + } + } + } + + /** + * @return cache configuration with test table + */ + @NotNull private CacheConfiguration<Object, Object> getTestTableConfiguration() { + LinkedHashMap<String, String> fields = new LinkedHashMap<>(); + fields.put("a", "java.lang.Integer"); + fields.put("B", "java.lang.String"); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(TEST_CACHE_NAME); + + ccfg.setQueryEntities(Collections.singletonList( + new QueryEntity() + .setKeyType("java.lang.Integer") + .setValueType("TestIndexObject") + .setFields(fields) + )); + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java index dcb3722..3f09062 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java @@ -315,7 +315,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo * @throws Exception If failed. */ private void checkNodeJoinOnPendingOperation(boolean addOrRemove) throws Exception { - CountDownLatch finishLatch = new CountDownLatch(4); + CountDownLatch finishLatch = new CountDownLatch(3); IgniteEx srv1 = ignitionStart(serverConfiguration(1), finishLatch); @@ -334,7 +334,6 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo ignitionStart(serverConfiguration(2), finishLatch); ignitionStart(serverConfiguration(3, true), finishLatch); - ignitionStart(clientConfiguration(4), finishLatch); assertFalse(idxFut.isDone()); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbc439b8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 619e7cf..68ff465 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheSqlQueryErrorSelfT import org.apache.ignite.internal.processors.cache.IgniteCacheUpdateSqlQuerySelfTest; import org.apache.ignite.internal.processors.cache.IgniteCheckClusterStateBeforeExecuteQueryTest; import org.apache.ignite.internal.processors.cache.IgniteCrossCachesJoinsQueryTest; +import org.apache.ignite.internal.processors.cache.IgniteDynamicSqlRestoreTest; import org.apache.ignite.internal.processors.cache.IncorrectQueryEntityTest; import org.apache.ignite.internal.processors.cache.QueryEntityCaseMismatchTest; import org.apache.ignite.internal.processors.cache.SqlFieldsQuerySelfTest; @@ -230,6 +231,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { // Config. suite.addTestSuite(IgniteCacheDuplicateEntityConfigurationSelfTest.class); suite.addTestSuite(IncorrectQueryEntityTest.class); + suite.addTestSuite(IgniteDynamicSqlRestoreTest.class); // Queries tests. suite.addTestSuite(LazyQuerySelfTest.class);