Repository: ignite Updated Branches: refs/heads/ignite-gg-10837 c286fabb4 -> 6652d871f
ignite-gg-10837 review Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6652d871 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6652d871 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6652d871 Branch: refs/heads/ignite-gg-10837 Commit: 6652d871f41ce9d49cf384f8223035faf19bd264 Parents: c286fab Author: sboikov <[email protected]> Authored: Wed Jan 20 13:03:44 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Jan 20 13:03:44 2016 +0300 ---------------------------------------------------------------------- .../transactions/IgniteTxLocalAdapter.java | 53 +++++++++++--------- 1 file changed, 30 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6652d871/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 32a9e27..4b7b992 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2056,14 +2056,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @Nullable ExpiryPolicy expiryPlc, @Nullable EntryProcessor<K, V, Object> entryProcessor, @Nullable Object[] invokeArgs, - @Nullable GridCacheVersion drVer, final boolean retval, boolean lockOnly, final CacheEntryPredicate[] filter, final GridCacheReturn ret, boolean skipStore, final boolean singleRmv, - boolean keepBinary) { + boolean keepBinary, + Byte dataCenterId) { try { addActiveCache(cacheCtx); @@ -2074,6 +2074,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (entryProcessor != null) transform = true; + GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null; long drTtl = singleRmv ? -1L : CU.TTL_ETERNAL; long drExpireTime = singleRmv ? -1L : CU.EXPIRE_TIME_ETERNAL; @@ -2136,6 +2137,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @param drRmvMap DR remove map (optional). * @param skipStore Skip store flag. * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. + * @param keepBinary Keep binary flag. + * @param dataCenterId Optional data center ID. * @return Future for missing values loading. */ private <K, V> IgniteInternalFuture<Void> enlistWrite( @@ -2154,7 +2157,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap, boolean skipStore, final boolean singleRmv, - final boolean keepBinary + final boolean keepBinary, + Byte dataCenterId ) { assert retval || invokeMap == null; @@ -2208,6 +2212,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig drTtl = -1L; drExpireTime = -1L; } + else if (dataCenterId != null) { + drVer = cctx.versions().next(dataCenterId); + drTtl = -1L; + drExpireTime = -1L; + } else { drVer = null; drTtl = -1L; @@ -2949,10 +2958,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - GridCacheVersion drVer = null; - - if (opCtx != null && opCtx.hasDataCenterId()) - drVer = cacheCtx.versions().next(opCtx.dataCenterId()); + final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null; KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); @@ -2965,14 +2971,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig opCtx != null ? opCtx.expiry() : null, entryProcessor, invokeArgs, - drVer, retval, /*lockOnly*/false, filter, ret, opCtx != null && opCtx.skipStore(), /*singleRmv*/false, - keepBinary); + keepBinary, + dataCenterId); if (pessimistic()) { assert loadFut == null || loadFut.isDone() : loadFut; @@ -3085,17 +3091,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + final Byte dataCenterId; + if (opCtx != null && opCtx.hasDataCenterId()) { assert drMap == null : drMap; assert map != null || invokeMap != null; - drMap = (Map<KeyCacheObject, GridCacheDrInfo>)F.viewReadOnly((Map)(map != null ? map : invokeMap), - new IgniteClosure<V, GridCacheDrInfo>() { - @Override public GridCacheDrInfo apply(V val) { - return new GridCacheDrInfo(cctx.versions().next(opCtx.dataCenterId())); - } - }); + dataCenterId = opCtx.dataCenterId(); } + else + dataCenterId = null; // Cached entry may be passed only from entry wrapper. final Map<?, ?> map0 = map; @@ -3143,7 +3148,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig null, opCtx != null && opCtx.skipStore(), false, - keepBinary); + keepBinary, + dataCenterId); if (pessimistic()) { assert loadFut == null || loadFut.isDone() : loadFut; @@ -3349,15 +3355,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - // DrMap used as map. Keys order doesn't matter. - if (keys != null && drMap == null && opCtx != null && opCtx.hasDataCenterId()) { - Map<K, GridCacheVersion> confMap = new HashMap<>(keys.size()); + final Byte dataCenterId; - for (K key : keys) - confMap.put(key, cacheCtx.versions().next(opCtx.dataCenterId())); + if (opCtx != null && opCtx.hasDataCenterId()) { + assert drMap == null : drMap; - drMap = (Map)confMap; + dataCenterId = opCtx.dataCenterId(); } + else + dataCenterId = null; assert keys0 != null; @@ -3417,7 +3423,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig drMap, opCtx != null && opCtx.skipStore(), singleRmv, - keepBinary + keepBinary, + dataCenterId ); if (log.isDebugEnabled())
