Merge branches 'ignite-843' and 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-843
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b9cd1844 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b9cd1844 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b9cd1844 Branch: refs/heads/ignite-843 Commit: b9cd18447c7636bbaa4fe0f78fb05e73f56b01e1 Parents: 6b7593d Author: Alexey Kuznetsov <[email protected]> Authored: Mon Aug 24 09:41:02 2015 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Mon Aug 24 09:41:04 2015 +0700 ---------------------------------------------------------------------- examples/config/example-cache.xml | 2 - .../apache/ignite/IgniteSystemProperties.java | 3 + .../store/jdbc/CacheAbstractJdbcStore.java | 45 +- .../cache/store/jdbc/CacheJdbcPojoStore.java | 32 +- .../store/jdbc/dialect/BasicJdbcDialect.java | 3 + .../cache/store/jdbc/dialect/DB2Dialect.java | 3 + .../cache/store/jdbc/dialect/H2Dialect.java | 3 + .../cache/store/jdbc/dialect/JdbcDialect.java | 3 +- .../cache/store/jdbc/dialect/MySQLDialect.java | 3 + .../cache/store/jdbc/dialect/OracleDialect.java | 3 + .../store/jdbc/dialect/SQLServerDialect.java | 3 + .../cluster/ClusterTopologyException.java | 18 + .../ignite/internal/MarshallerContextImpl.java | 24 +- .../ClusterTopologyCheckedException.java | 18 + .../CachePartialUpdateCheckedException.java | 11 +- .../processors/cache/GridCacheAdapter.java | 85 ++- .../cache/GridCacheEvictionManager.java | 2 +- .../processors/cache/GridCacheIoManager.java | 1 - .../cache/GridCacheSharedContext.java | 17 + .../processors/cache/GridCacheUtils.java | 23 + .../distributed/GridDistributedCacheEntry.java | 11 +- .../dht/GridClientPartitionTopology.java | 20 + .../distributed/dht/GridDhtCacheAdapter.java | 12 +- .../cache/distributed/dht/GridDhtGetFuture.java | 12 +- .../dht/GridDhtPartitionTopology.java | 7 + .../dht/GridDhtPartitionTopologyImpl.java | 20 + .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 182 ++++++- .../dht/GridDhtTxPrepareResponse.java | 42 +- .../dht/GridPartitionedGetFuture.java | 104 ++-- .../dht/atomic/GridDhtAtomicCache.java | 16 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 3 + .../dht/colocated/GridDhtColocatedCache.java | 19 +- .../colocated/GridDhtColocatedLockFuture.java | 8 +- .../GridDhtPartitionsExchangeFuture.java | 40 +- .../distributed/near/GridNearAtomicCache.java | 6 +- .../distributed/near/GridNearCacheAdapter.java | 15 +- .../distributed/near/GridNearCacheEntry.java | 10 +- .../distributed/near/GridNearGetFuture.java | 120 +++-- .../distributed/near/GridNearLockFuture.java | 12 +- .../near/GridNearOptimisticTxPrepareFuture.java | 13 +- .../GridNearPessimisticTxPrepareFuture.java | 9 +- .../near/GridNearTransactionalCache.java | 9 +- .../cache/distributed/near/GridNearTxLocal.java | 7 +- .../near/GridNearTxPrepareResponse.java | 3 - .../cache/local/GridLocalCacheEntry.java | 4 +- .../local/atomic/GridLocalAtomicCache.java | 17 +- .../cache/transactions/IgniteInternalTx.java | 2 +- .../cache/transactions/IgniteTxAdapter.java | 19 +- .../cache/transactions/IgniteTxEntry.java | 18 + .../cache/transactions/IgniteTxHandler.java | 5 +- .../transactions/IgniteTxLocalAdapter.java | 4 +- .../service/GridServiceProcessor.java | 5 + .../ignite/internal/util/IgniteUtils.java | 10 +- .../ignite/internal/util/lang/GridFunc.java | 14 + .../TcpDiscoveryMulticastIpFinder.java | 38 ++ .../config/store/jdbc/ignite-type-metadata.xml | 8 + .../store/jdbc/CacheJdbcPojoStoreTest.java | 33 +- ...eJdbcStoreAbstractMultithreadedSelfTest.java | 16 +- .../ignite/cache/store/jdbc/model/Person.java | 26 +- .../cache/CrossCacheTxRandomOperationsTest.java | 534 +++++++++++++++++++ ...teAtomicCacheEntryProcessorNodeJoinTest.java | 32 ++ .../IgniteCacheEntryProcessorNodeJoinTest.java | 225 ++++++++ .../IgniteCacheTopologySafeGetSelfTest.java | 218 ++++++++ .../GridCacheMultiNodeLockAbstractTest.java | 41 +- .../GridCacheTransformEventSelfTest.java | 2 + .../IgniteCacheCrossCacheTxFailoverTest.java | 433 +++++++++++++++ .../IgniteCachePutRetryAbstractSelfTest.java | 1 + ...gniteCachePutRetryTransactionalSelfTest.java | 187 +++++++ .../near/GridCacheNearOnlyTopologySelfTest.java | 4 +- .../near/GridCacheNearTxForceKeyTest.java | 76 +++ ...idCachePartitionedHitsAndMissesSelfTest.java | 20 +- ...idCachePartitionedMultiNodeLockSelfTest.java | 8 +- ...ridCacheReplicatedMultiNodeLockSelfTest.java | 8 +- .../lru/LruNearEvictionPolicySelfTest.java | 29 +- .../LruNearOnlyNearEvictionPolicySelfTest.java | 55 +- .../OptimizedMarshallerNodeFailoverTest.java | 97 +++- .../IgniteCacheFailoverTestSuite.java | 2 + .../IgniteCacheFailoverTestSuite2.java | 2 + .../testsuites/IgniteCacheTestSuite2.java | 5 + .../ignite/schema/parser/DbMetadataReader.java | 41 +- .../parser/dialect/DB2MetadataDialect.java | 3 +- .../parser/dialect/DatabaseMetadataDialect.java | 13 +- .../parser/dialect/JdbcMetadataDialect.java | 129 +++-- .../parser/dialect/MySQLMetadataDialect.java | 57 ++ .../parser/dialect/OracleMetadataDialect.java | 111 ++-- .../ignite/schema/model/PojoDescriptor.java | 6 +- .../ignite/schema/model/SchemaDescriptor.java | 61 +++ .../schema/parser/DatabaseMetadataParser.java | 26 +- .../org/apache/ignite/schema/ui/Controls.java | 25 +- .../ignite/schema/ui/SchemaImportApp.java | 157 +++++- .../schema/test/AbstractSchemaImportTest.java | 10 +- .../jdbc/CacheJdbcPojoStoreFactorySelfTest.java | 14 +- 93 files changed, 3437 insertions(+), 420 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/examples/config/example-cache.xml ---------------------------------------------------------------------- diff --git a/examples/config/example-cache.xml b/examples/config/example-cache.xml index 6d1d0da..98e1a71 100644 --- a/examples/config/example-cache.xml +++ b/examples/config/example-cache.xml @@ -37,8 +37,6 @@ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> - <property name="localHost" value="127.0.0.1" /> - <property name="cacheConfiguration"> <list> <!-- Partitioned cache example configuration (Atomic mode). --> http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 7e96b29..7c808df 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -354,6 +354,9 @@ public final class IgniteSystemProperties { /** Number of cache operation retries in case of topology exceptions. */ public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT"; + /** Number of times pending cache objects will be dumped to the log in case of partition exchange timeout. */ + public static final String IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD = "IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index b1e223b..b2be8c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -355,6 +355,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, * @throws SQLException If a database access error occurs or this method is called. */ protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException { + Object val = rs.getObject(colIdx); + + if (val == null) + return null; + if (type == int.class) return rs.getInt(colIdx); @@ -364,7 +369,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, if (type == double.class) return rs.getDouble(colIdx); - if (type == boolean.class) + if (type == boolean.class || type == Boolean.class) return rs.getBoolean(colIdx); if (type == byte.class) @@ -378,31 +383,23 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, if (type == Integer.class || type == Long.class || type == Double.class || type == Byte.class || type == Short.class || type == Float.class) { - Object val = rs.getObject(colIdx); - - if (val != null) { - Number num = (Number)val; - - if (type == Integer.class) - return num.intValue(); - else if (type == Long.class) - return num.longValue(); - else if (type == Double.class) - return num.doubleValue(); - else if (type == Byte.class) - return num.byteValue(); - else if (type == Short.class) - return num.shortValue(); - else if (type == Float.class) - return num.floatValue(); - } - else - return EMPTY_COLUMN_VALUE; + Number num = (Number)val; + + if (type == Integer.class) + return num.intValue(); + else if (type == Long.class) + return num.longValue(); + else if (type == Double.class) + return num.doubleValue(); + else if (type == Byte.class) + return num.byteValue(); + else if (type == Short.class) + return num.shortValue(); + else if (type == Float.class) + return num.floatValue(); } - Object val = rs.getObject(colIdx); - - if (type == UUID.class && val != null) { + if (type == UUID.class) { if (val instanceof UUID) return val; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java index 7b78bda..1ff170e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java @@ -99,8 +99,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> { getters.put(field.getJavaName(), cls.getMethod("is" + prop)); } catch (NoSuchMethodException e) { - throw new CacheException("Failed to find getter in POJO class [class name=" + clsName + - ", property=" + field.getJavaName() + "]", e); + throw new CacheException("Failed to find getter in POJO class [clsName=" + clsName + + ", prop=" + field.getJavaName() + "]", e); } } @@ -108,8 +108,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> { setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType())); } catch (NoSuchMethodException e) { - throw new CacheException("Failed to find setter in POJO class [class name=" + clsName + - ", property=" + field.getJavaName() + "]", e); + throw new CacheException("Failed to find setter in POJO class [clsName=" + clsName + + ", prop=" + field.getJavaName() + "]", e); } } } @@ -167,15 +167,25 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> { Object obj = mc.ctor.newInstance(); for (CacheTypeFieldMetadata field : fields) { - Method setter = mc.setters.get(field.getJavaName()); + String fldJavaName = field.getJavaName(); + + Method setter = mc.setters.get(fldJavaName); if (setter == null) - throw new CacheLoaderException("Failed to find setter in POJO class [class name=" + typeName + - ", property=" + field.getJavaName() + "]"); + throw new IllegalStateException("Failed to find setter in POJO class [clsName=" + typeName + + ", prop=" + fldJavaName + "]"); + + String fldDbName = field.getDatabaseName(); - Integer colIdx = loadColIdxs.get(field.getDatabaseName()); + Integer colIdx = loadColIdxs.get(fldDbName); - setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType())); + try { + setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType())); + } + catch (Exception e) { + throw new IllegalStateException("Failed to set property in POJO class [clsName=" + typeName + + ", prop=" + fldJavaName + ", col=" + colIdx + ", dbName=" + fldDbName + "]", e); + } } return (R)obj; @@ -204,8 +214,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> { Method getter = mc.getters.get(fieldName); if (getter == null) - throw new CacheLoaderException("Failed to find getter in POJO class [class name=" + typeName + - ", property=" + fieldName + "]"); + throw new CacheLoaderException("Failed to find getter in POJO class [clsName=" + typeName + + ", prop=" + fieldName + "]"); return getter.invoke(obj); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java index d0dd6f4..b43c7d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java @@ -26,6 +26,9 @@ import java.util.*; * Basic implementation of dialect based on JDBC specification. */ public class BasicJdbcDialect implements JdbcDialect { + /** */ + private static final long serialVersionUID = 0L; + /** Default max query parameters count. */ protected static final int DFLT_MAX_PARAMS_CNT = 2000; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java index fe1d876..2a08557 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java @@ -25,6 +25,9 @@ import java.util.*; * A dialect compatible with the IBM DB2 database. */ public class DB2Dialect extends BasicJdbcDialect { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public boolean hasMerge() { return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java index a97e144..8091e1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java @@ -25,6 +25,9 @@ import java.util.*; * A dialect compatible with the H2 database. */ public class H2Dialect extends BasicJdbcDialect { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public boolean hasMerge() { return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java index be1cc67..32adcc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java @@ -17,12 +17,13 @@ package org.apache.ignite.cache.store.jdbc.dialect; +import java.io.*; import java.util.*; /** * Represents a dialect of SQL implemented by a particular RDBMS. */ -public interface JdbcDialect { +public interface JdbcDialect extends Serializable { /** * Construct select count query. * http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java index df16841..def2fe7 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java @@ -25,6 +25,9 @@ import java.util.*; * A dialect compatible with the MySQL database. */ public class MySQLDialect extends BasicJdbcDialect { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public boolean hasMerge() { return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java index 351f10a..e155fb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java @@ -25,6 +25,9 @@ import java.util.*; * A dialect compatible with the Oracle database. */ public class OracleDialect extends BasicJdbcDialect { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public boolean hasMerge() { return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java index e781e98..7fdda6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java @@ -25,6 +25,9 @@ import java.util.*; * A dialect compatible with the Microsoft SQL Server database. */ public class SQLServerDialect extends BasicJdbcDialect { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public boolean hasMerge() { return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java index d28c409..61bc367 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java @@ -18,6 +18,7 @@ package org.apache.ignite.cluster; import org.apache.ignite.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; /** @@ -27,6 +28,9 @@ public class ClusterTopologyException extends IgniteException { /** */ private static final long serialVersionUID = 0L; + /** Retry ready future. */ + private transient IgniteFuture<?> readyFut; + /** * Creates new topology exception with given error message. * @@ -46,4 +50,18 @@ public class ClusterTopologyException extends IgniteException { public ClusterTopologyException(String msg, @Nullable Throwable cause) { super(msg, cause); } + + /** + * @return Retry ready future. + */ + public IgniteFuture<?> retryReadyFuture() { + return readyFut; + } + + /** + * @param readyFut Retry ready future. + */ + public void retryReadyFuture(IgniteFuture<?> readyFut) { + this.readyFut = readyFut; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 9f7c983..dc0fd57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.*; @@ -135,7 +136,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { throw new IllegalStateException("Failed to initialize marshaller context (grid is stopping)."); } - String clsName = cache0.get(id); + String clsName = cache0.getTopologySafe(id); if (clsName == null) { File file = new File(workDir, id + ".classname"); @@ -177,18 +178,21 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events) throws CacheEntryListenerException { for (CacheEntryEvent<? extends Integer, ? extends String> evt : events) { - assert evt.getOldValue() == null : "Received non-null old value for system marshaller cache: " + evt; + assert evt.getOldValue() == null || F.eq(evt.getOldValue(), evt.getValue()): + "Received cache entry update for system marshaller cache: " + evt; - File file = new File(workDir, evt.getKey() + ".classname"); + if (evt.getOldValue() == null) { + File file = new File(workDir, evt.getKey() + ".classname"); - try (Writer writer = new FileWriter(file)) { - writer.write(evt.getValue()); + try (Writer writer = new FileWriter(file)) { + writer.write(evt.getValue()); - writer.flush(); - } - catch (IOException e) { - U.error(log, "Failed to write class name to file [id=" + evt.getKey() + - ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e); + writer.flush(); + } + catch (IOException e) { + U.error(log, "Failed to write class name to file [id=" + evt.getKey() + + ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java index 8f985b4..2d7b0de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.cluster; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.jetbrains.annotations.*; /** @@ -27,6 +28,9 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException { /** */ private static final long serialVersionUID = 0L; + /** Next topology version to wait. */ + private transient IgniteInternalFuture<?> readyFut; + /** * Creates new topology exception with given error message. * @@ -46,4 +50,18 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException { public ClusterTopologyCheckedException(String msg, @Nullable Throwable cause) { super(msg, cause); } + + /** + * @return Retry ready future. + */ + public IgniteInternalFuture<?> retryReadyFuture() { + return readyFut; + } + + /** + * @param readyFut Retry ready future. + */ + public void retryReadyFuture(IgniteInternalFuture<?> readyFut) { + this.readyFut = readyFut; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java index c2259df..ab38e5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java @@ -47,8 +47,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException { * Gets collection of failed keys. * @return Collection of failed keys. */ - public <K> Collection<K> failedKeys() { - return (Collection<K>)failedKeys; + @SuppressWarnings("unchecked") + public synchronized <K> Collection<K> failedKeys() { + return new LinkedHashSet<>((Collection<K>)failedKeys); } /** @@ -56,7 +57,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException { * @param err Error. * @param topVer Topology version for failed update. */ - public void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) { + public synchronized void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) { if (topVer != null) { AffinityTopologyVersion topVer0 = this.topVer; @@ -72,7 +73,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException { /** * @return Topology version. */ - public AffinityTopologyVersion topologyVersion() { + public synchronized AffinityTopologyVersion topologyVersion() { return topVer; } @@ -80,7 +81,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException { * @param failedKeys Failed keys. * @param err Error. */ - public void add(Collection<?> failedKeys, Throwable err) { + public synchronized void add(Collection<?> failedKeys, Throwable err) { add(failedKeys, err, null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 177dcfb..7adea2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -526,7 +526,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*subj id*/null, /*task name*/null, /*deserialize portable*/false, - /*skip values*/true + /*skip values*/true, + /*can remap*/true ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { Map<K, V> map = fut.get(); @@ -560,7 +561,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*subj id*/null, /*task name*/null, /*deserialize portable*/false, - /*skip values*/true + /*skip values*/true, + /*can remap*/true ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { Map<K, V> kvMap = fut.get(); @@ -894,7 +896,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySet() { - return entrySet((CacheEntryPredicate[]) null); + return entrySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -919,12 +921,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return primaryKeySet((CacheEntryPredicate[]) null); + return primaryKeySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @Override public Collection<V> values() { - return values((CacheEntryPredicate[]) null); + return values((CacheEntryPredicate[])null); } /** @@ -1210,22 +1212,63 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Override public V getForcePrimary(K key) throws IgniteCheckedException { String taskName = ctx.kernalContext().job().currentTaskName(); - return getAllAsync(F.asList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false) - .get().get(key); + return getAllAsync( + F.asList(key), + /*force primary*/true, + /*skip tx*/false, + /*cached entry*/null, + /*subject id*/null, + taskName, + /*deserialize cache objects*/true, + /*skip values*/false, + /*can remap*/true + ).get().get(key); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> getForcePrimaryAsync(final K key) { String taskName = ctx.kernalContext().job().currentTaskName(); - return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null, - taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { + return getAllAsync( + Collections.singletonList(key), + /*force primary*/true, + /*skip tx*/false, + null, + null, + taskName, + true, + false, + /*can remap*/true + ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { return e.get().get(key); } }); } + /** + * Gets value without waiting for toplogy changes. + * + * @param key Key. + * @return Value. + * @throws IgniteCheckedException If failed. + */ + public V getTopologySafe(K key) throws IgniteCheckedException { + String taskName = ctx.kernalContext().job().currentTaskName(); + + return getAllAsync( + F.asList(key), + /*force primary*/false, + /*skip tx*/false, + /*cached entry*/null, + /*subject id*/null, + taskName, + /*deserialize cache objects*/true, + /*skip values*/false, + /*can remap*/false + ).get().get(key); + } + /** {@inheritDoc} */ @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException { return getAllOutTxAsync(keys).get(); @@ -1242,7 +1285,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V null, taskName, !ctx.keepPortable(), - false); + /*skip values*/false, + /*can remap*/true); } /** @@ -1577,7 +1621,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable UUID subjId, String taskName, boolean deserializePortable, - boolean skipVals + boolean skipVals, + boolean canRemap ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -1592,7 +1637,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V deserializePortable, forcePrimary, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), - skipVals); + skipVals, + canRemap); } /** @@ -1618,7 +1664,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean deserializePortable, final boolean forcePrimary, @Nullable IgniteCacheExpiryPolicy expiry, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -1633,7 +1680,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V deserializePortable, expiry, skipVals, - false); + false, + canRemap); } /** @@ -1656,7 +1704,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiry, final boolean skipVals, - final boolean keepCacheObjects + final boolean keepCacheObjects, + boolean canRemap ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap()); @@ -1679,7 +1728,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert keys != null; final AffinityTopologyVersion topVer = tx == null - ? ctx.affinity().affinityTopologyVersion() + ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion()) : tx.topologyVersion(); final Map<K1, V1> map = new GridLeanMap<>(keys.size()); @@ -4465,7 +4514,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V null, taskName, deserializePortable, - false); + false, + /*can remap*/true); } /** @@ -4693,6 +4743,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * */ public void execute() { tx = ctx.tm().newTx( http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 55669a7..fd71ab5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -134,7 +134,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { memoryMode = cctx.config().getMemoryMode(); - plcEnabled = plc != null && memoryMode != OFFHEAP_TIERED; + plcEnabled = plc != null && (cctx.isNear() || memoryMode != OFFHEAP_TIERED); filter = cfg.getEvictionFilter(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 84e4dc2..0ef190e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -493,7 +493,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { req.version(), null, null, - null, null); res.error(req.classError()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 4075d79..cc661d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -531,6 +531,23 @@ public class GridCacheSharedContext<K, V> { } /** + * Gets ready future for the next affinity topology version (used in cases when a node leaves grid). + * + * @param curVer Current topology version (before a node left grid). + * @return Ready future. + */ + public IgniteInternalFuture<?> nextAffinityReadyFuture(AffinityTopologyVersion curVer) { + if (curVer == null) + return null; + + AffinityTopologyVersion nextVer = new AffinityTopologyVersion(curVer.topologyVersion() + 1); + + IgniteInternalFuture<?> fut = exchMgr.affinityReadyFuture(nextVer); + + return fut == null ? new GridFinishedFuture<>() : fut; + } + + /** * @param tx Transaction to check. * @param activeCacheIds Active cache IDs. * @param cacheCtx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index a313e3d..bf55f59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1674,6 +1674,29 @@ public class GridCacheUtils { } /** + * @param partsMap Cache ID to partition IDs collection map. + * @return Cache ID to partition ID array map. + */ + public static Map<Integer, int[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) { + Map<Integer, int[]> res = new HashMap<>(partsMap.size()); + + for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) { + Set<Integer> parts = entry.getValue(); + + int[] partsArray = new int[parts.size()]; + + int idx = 0; + + for (Integer part : parts) + partsArray[idx++] = part; + + res.put(entry.getKey(), partsArray); + } + + return res; + } + + /** * Stops store session listeners. * * @param ctx Kernal context. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index bd72764..59d75be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -68,6 +69,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { * * @param threadId Owning thread ID. * @param ver Lock version. + * @param topVer Topology version. * @param timeout Timeout to acquire lock. * @param reenter Reentry flag. * @param tx Transaction flag. @@ -78,6 +80,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { @Nullable public GridCacheMvccCandidate addLocal( long threadId, GridCacheVersion ver, + AffinityTopologyVersion topVer, long timeout, boolean reenter, boolean tx, @@ -105,6 +108,9 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { cand = mvcc.addLocal(this, threadId, ver, timeout, reenter, tx, implicitSingle); + if (cand != null) + cand.topologyVersion(topVer); + owner = mvcc.anyOwner(); boolean emptyAfter = mvcc.isEmpty(); @@ -732,6 +738,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { return addLocal( tx.threadId(), tx.xidVersion(), + tx.topologyVersion(), timeout, false, true, @@ -828,8 +835,10 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { // Allow next lock in the thread to proceed. if (!cand.used()) { + GridCacheContext cctx0 = cand.parent().context(); + GridDistributedCacheEntry e = - (GridDistributedCacheEntry)cctx.cache().peekEx(cand.key()); + (GridDistributedCacheEntry)cctx0.cache().peekEx(cand.key()); if (e != null) e.recheck(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index c3f3e7f..531678e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { lock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 22a5287..49fbc5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -539,7 +539,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable UUID subjId, String taskName, boolean deserializePortable, - boolean skipVals + boolean skipVals, + boolean canRemap ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -552,7 +553,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap deserializePortable, forcePrimary, null, - skipVals); + skipVals, + canRemap); } /** @@ -570,7 +572,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable UUID subjId, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, - boolean skipVals + boolean skipVals, + boolean canRemap ) { return getAllAsync0(keys, readThrough, @@ -580,7 +583,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap false, expiry, skipVals, - /*keep cache objects*/true); + /*keep cache objects*/true, + canRemap); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 742fbfe..9005541 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -349,12 +349,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } else { if (tx == null) { - fut = cache().getDhtAllAsync(keys.keySet(), + fut = cache().getDhtAllAsync( + keys.keySet(), readThrough, subjId, taskName, expiryPlc, - skipVals); + skipVals, + /*can remap*/true); } else { fut = tx.getAllAsync(cctx, @@ -387,12 +389,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } else { if (tx == null) { - return cache().getDhtAllAsync(keys.keySet(), + return cache().getDhtAllAsync( + keys.keySet(), readThrough, subjId, taskName, expiryPlc, - skipVals); + skipVals, + /*can remap*/true); } else { return tx.getAllAsync(cctx, http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index c551fb3..7b08510 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology { public GridDhtPartitionMap localPartitionMap(); /** + * @param nodeId Node ID. + * @param part Partition. + * @return Partition state. + */ + public GridDhtPartitionState partitionState(UUID nodeId, int part); + + /** * @return Current update sequence. */ public long updateSequence(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index facf329..8973dcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -638,6 +638,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 6a72c89..7da6e07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @return Future that will be completed when locks are acquired. */ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( - @Nullable Iterable<IgniteTxEntry> reads, - @Nullable Iterable<IgniteTxEntry> writes, + @Nullable Collection<IgniteTxEntry> reads, + @Nullable Collection<IgniteTxEntry> writes, Map<IgniteTxKey, GridCacheVersion> verMap, long msgId, IgniteUuid nearMiniId, http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 9bd5de2..2071f8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** Keys that should be locked. */ private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + /** Force keys future for correct transforms. */ + private IgniteInternalFuture<?> forceKeysFut; + /** Locks ready flag. */ private volatile boolean locksReady; @@ -291,14 +294,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters()); - if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) { + if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) { + CacheObject val; + cached.unswap(retVal); boolean readThrough = (retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue() && !txEntry.skipStore(); - CacheObject val = cached.innerGet( + val = cached.innerGet( tx, /*swap*/true, readThrough, @@ -312,10 +317,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter null, null); - if (retVal) { + if (retVal || txEntry.op() == TRANSFORM) { if (!F.isEmpty(txEntry.entryProcessors())) { invoke = true; + if (txEntry.hasValue()) + val = txEntry.value(); + KeyCacheObject key = txEntry.key(); Object procRes = null; @@ -339,12 +347,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } } - if (err != null || procRes != null) - ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err); - else - ret.invokeResult(true); + txEntry.entryProcessorCalculatedValue(val); + + if (retVal) { + if (err != null || procRes != null) + ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err); + else + ret.invokeResult(true); + } } - else + else if (retVal) ret.value(cacheCtx, val); } @@ -362,7 +374,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter ret.success(false); } else - ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue()); + ret.success(txEntry.op() != DELETE || cached.hasValue()); } } catch (IgniteCheckedException e) { @@ -466,7 +478,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter */ private boolean mapIfLocked() { if (checkLocks()) { - prepare0(); + if (!mapped.compareAndSet(false, true)) + return false; + + if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null)) + prepare0(); + else { + forceKeysFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + f.get(); + + prepare0(); + } + catch (IgniteCheckedException e) { + onError(e); + } + } + }); + } return true; } @@ -574,13 +604,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter // Send reply back to originating near node. Throwable prepErr = err.get(); + assert F.isEmpty(tx.invalidPartitions()); + GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( tx.nearXidVersion(), tx.colocated() ? tx.xid() : tx.nearFutureId(), nearMiniId == null ? tx.xid() : nearMiniId, tx.xidVersion(), tx.writeVersion(), - tx.invalidPartitions(), ret, prepErr, null); @@ -708,7 +739,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param writes Write entries. * @param txNodes Transaction nodes mapping. */ - public void prepare(Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes, + @SuppressWarnings("TypeMayBeWeakened") + public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, Map<UUID, Collection<UUID>> txNodes) { if (tx.empty()) { tx.setRollbackOnly(); @@ -720,6 +752,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter this.writes = writes; this.txNodes = txNodes; + if (!F.isEmpty(writes)) { + Map<Integer, Collection<KeyCacheObject>> forceKeys = null; + + for (IgniteTxEntry entry : writes) + forceKeys = checkNeedRebalanceKeys(entry, forceKeys); + + forceKeysFut = forceRebalanceKeys(forceKeys); + } + readyLocks(); mapIfLocked(); @@ -734,12 +775,79 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } /** + * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store + * required key or will create new map if passed in map is {@code null}. * + * @param e TX entry. + * @param map Map with needed preload keys. + * @return Map if it was created. */ - private void prepare0() { - if (!mapped.compareAndSet(false, true)) - return; + private Map<Integer, Collection<KeyCacheObject>> checkNeedRebalanceKeys( + IgniteTxEntry e, + Map<Integer, Collection<KeyCacheObject>> map + ) { + if (retVal || !F.isEmpty(e.entryProcessors())) { + if (map == null) + map = new HashMap<>(); + + Collection<KeyCacheObject> keys = map.get(e.cacheId()); + + if (keys == null) { + keys = new ArrayList<>(); + + map.put(e.cacheId(), keys); + } + + keys.add(e.key()); + } + + return map; + } + + /** + * @param keysMap Keys to request. + * @return Keys request future. + */ + private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) { + if (F.isEmpty(keysMap)) + return null; + + GridCompoundFuture<Object, Object> compFut = null; + IgniteInternalFuture<Object> lastForceFut = null; + + for (Map.Entry<Integer, Collection<KeyCacheObject>> entry : keysMap.entrySet()) { + if (lastForceFut != null && compFut == null) { + compFut = new GridCompoundFuture(); + + compFut.add(lastForceFut); + } + + int cacheId = entry.getKey(); + Collection<KeyCacheObject> keys = entry.getValue(); + + lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion()); + + if (compFut != null) + compFut.add(lastForceFut); + } + + if (compFut != null) { + compFut.markInitialized(); + + return compFut; + } + else { + assert lastForceFut != null; + + return lastForceFut; + } + } + + /** + * + */ + private void prepare0() { try { // We are holding transaction-level locks for entries here, so we can get next write version. onEntriesLocked(); @@ -956,7 +1064,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter private boolean map( IgniteTxEntry entry, Map<UUID, GridDistributedTxMapping> futDhtMap, - Map<UUID, GridDistributedTxMapping> futNearMap) { + Map<UUID, GridDistributedTxMapping> futNearMap + ) { if (entry.cached().isLocal()) return false; @@ -1023,14 +1132,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param locMap Exclude map. * @return {@code True} if mapped. */ - private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes, - Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) { + private boolean map( + IgniteTxEntry entry, + Iterable<ClusterNode> nodes, + Map<UUID, GridDistributedTxMapping> globalMap, + Map<UUID, GridDistributedTxMapping> locMap + ) { boolean ret = false; if (nodes != null) { for (ClusterNode n : nodes) { GridDistributedTxMapping global = globalMap.get(n.id()); + if (!F.isEmpty(entry.entryProcessors())) { + GridDhtPartitionState state = entry.context().topology().partitionState(n.id(), + entry.cached().partition()); + + if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { + CacheObject procVal = entry.entryProcessorCalculatedValue(); + + entry.op(procVal == null ? DELETE : UPDATE); + entry.value(procVal, true, false); + entry.entryProcessors(null); + } + } + if (global == null) globalMap.put(n.id(), global = new GridDistributedTxMapping(n)); @@ -1194,6 +1320,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } // Process invalid partitions (no need to remap). + // Keep this loop for backward compatibility. if (!F.isEmpty(res.invalidPartitions())) { for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { IgniteTxEntry entry = it.next(); @@ -1206,6 +1333,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); } } + } + + // Process invalid partitions (no need to remap). + if (!F.isEmpty(res.invalidPartitionsByCacheId())) { + Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId(); + + for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { + IgniteTxEntry entry = it.next(); + + int[] invalidParts = invalidPartsMap.get(entry.cacheId()); + + if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) { + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + + ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); + } + } if (dhtMapping.empty()) { dhtMap.remove(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java index 753c117..bcf7f8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java @@ -55,6 +55,10 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { @GridDirectCollection(int.class) private Collection<Integer> invalidParts; + /** Invalid partitions by cache ID. */ + @GridDirectMap(keyType = Integer.class, valueType = int[].class) + private Map<Integer, int[]> invalidPartsByCacheId; + /** Preload entries. */ @GridDirectCollection(GridCacheEntryInfo.class) private List<GridCacheEntryInfo> preloadEntries; @@ -140,6 +144,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { } /** + * @return Map from cacheId to an array of invalid partitions. + */ + public Map<Integer, int[]> invalidPartitionsByCacheId() { + return invalidPartsByCacheId; + } + + /** + * @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions. + */ + public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) { + this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId); + } + + /** * Gets preload entries found on backup node. * * @return Collection of entry infos need to be preloaded. @@ -238,18 +256,24 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { writer.incrementState(); case 10: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeMap("invalidPartsByCacheId", invalidPartsByCacheId, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR)) return false; writer.incrementState(); case 11: - if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 12: + if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 13: if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG)) return false; @@ -288,7 +312,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { reader.incrementState(); case 10: - miniId = reader.readIgniteUuid("miniId"); + invalidPartsByCacheId = reader.readMap("invalidPartsByCacheId", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false); if (!reader.isLastRead()) return false; @@ -296,7 +320,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { reader.incrementState(); case 11: - nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -304,6 +328,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { reader.incrementState(); case 12: + nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -323,6 +355,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 13; + return 14; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index bb3673d..6e39672 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -61,7 +61,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M DFLT_MAX_REMAP_CNT); /** Context. */ - private GridCacheContext<K, V> cctx; + private final GridCacheContext<K, V> cctx; /** Keys. */ private Collection<KeyCacheObject> keys; @@ -105,6 +105,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** Skip values flag. */ private boolean skipVals; + /** Flag indicating whether future can be remapped on a newer topology version. */ + private final boolean canRemap; + /** * @param cctx Context. * @param keys Keys. @@ -130,7 +133,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M String taskName, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals + boolean skipVals, + boolean canRemap ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); @@ -147,6 +151,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M this.taskName = taskName; this.expiryPlc = expiryPlc; this.skipVals = skipVals; + this.canRemap = canRemap; futId = IgniteUuid.randomUuid(); @@ -160,7 +165,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * Initializes future. */ public void init() { - AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : + canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion(); map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); @@ -334,7 +340,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M remapKeys.add(key); } - AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx(); assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + @@ -461,7 +467,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M } } - ClusterNode node = cctx.affinity().primary(key, topVer); + ClusterNode node = affinityNode(key, topVer); if (node == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + @@ -522,6 +528,28 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M } /** + * Finds affinity node to send get request to. + * + * @param key Key to get. + * @param topVer Topology version. + * @return Affinity node from which the key will be requested. + */ + private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { + if (!canRemap) { + List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer); + + for (ClusterNode node : nodes) { + if (cctx.discovery().alive(node)) + return node; + } + + return null; + } + else + return cctx.affinity().primary(key, topVer); + } + + /** * @param infos Entry infos. * @return Result map. */ @@ -557,14 +585,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M private final IgniteUuid futId = IgniteUuid.randomUuid(); /** Node ID. */ - private ClusterNode node; + private final ClusterNode node; /** Keys. */ @GridToStringInclude - private LinkedHashMap<KeyCacheObject, Boolean> keys; + private final LinkedHashMap<KeyCacheObject, Boolean> keys; /** Topology version on which this future was mapped. */ - private AffinityTopologyVersion topVer; + private final AffinityTopologyVersion topVer; /** {@code True} if remapped after node left. */ private boolean remapped; @@ -625,37 +653,45 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - final AffinityTopologyVersion updTopVer = - new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + // Try getting from existing nodes. + if (!canRemap) { + map(keys.keySet(), F.t(node, keys), topVer); - final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), - updTopVer, - e); - - cctx.affinity().affinityReadyFuture(updTopVer).listen( - new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); - - try { - fut.get(); - - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); - - onDone(Collections.<K, V>emptyMap()); - } - catch (IgniteCheckedException e) { - GridPartitionedGetFuture.this.onDone(e); + onDone(Collections.<K, V>emptyMap()); + } + else { + final AffinityTopologyVersion updTopVer = + new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, + cctx.kernalContext().config().getNetworkTimeout(), + updTopVer, + e); + + cctx.affinity().affinityReadyFuture(updTopVer).listen( + new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + if (timeout.finish()) { + cctx.kernalContext().timeout().removeTimeoutObject(timeout); + + try { + fut.get(); + + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); + + onDone(Collections.<K, V>emptyMap()); + } + catch (IgniteCheckedException e) { + GridPartitionedGetFuture.this.onDone(e); + } } } } - } - ); + ); - cctx.kernalContext().timeout().addTimeoutObject(timeout); + cctx.kernalContext().timeout().addTimeoutObject(timeout); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index bee34d9..c44b028 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -248,7 +248,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable UUID subjId, final String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + final boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -278,7 +279,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { deserializePortable, expiryPlc, skipVals, - skipStore); + skipStore, + canRemap); } }); } @@ -870,8 +872,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean deserializePortable, @Nullable ExpiryPolicy expiryPlc, boolean skipVals, - boolean skipStore) { - AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); + boolean skipStore, + boolean canRemap + ) { + AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : + ctx.shared().exchange().readyAffinityVersion(); final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); @@ -971,7 +976,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, deserializePortable, expiry, - skipVals); + skipVals, + canRemap); fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index e527f08..07ec808 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -231,6 +231,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> nearEnabled = CU.isNearEnabled(cctx); + if (!waitTopFut) + remapCnt = 1; + this.remapCnt = new AtomicInteger(remapCnt); }
