tmp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6187b1f8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6187b1f8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6187b1f8 Branch: refs/heads/ignite-4680-sb Commit: 6187b1f88226a66f7336731799de13ab98c0ab20 Parents: 956549e Author: sboikov <sboi...@gridgain.com> Authored: Tue Mar 21 12:16:22 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Mar 21 12:16:22 2017 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 58 +++++++------- .../GridNearAtomicAbstractUpdateRequest.java | 37 +++++---- .../dht/atomic/NearAtomicResponseHelper.java | 82 -------------------- 3 files changed, 49 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index feed87f..542071a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1791,10 +1791,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { top.readLock(); try { - if (top.stopping()) { - + if (top.stopping()) return; - } // Do not check topology version if topology was locked on near node by // external transaction or explicit lock. @@ -1811,11 +1809,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (TEST_STRIPE_SUBMIT) { for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) { - if (stripeIdx == e.getKey()) - continue; - else { + if (stripeIdx != e.getKey()) { ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() { @Override public void run() { + // No-op. } }); } @@ -1824,7 +1821,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { update(affAssignment, ver, fut, node, req, null, completionCb); } else { - req.responseHelper(new NearAtomicResponseHelper(stripemap.size())); + req.setResCount(stripemap.size()); for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) { if (stripeIdx == e.getKey()) @@ -1924,23 +1921,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicAbstractUpdateRequest req, int[] stripeIdxs, UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), - node.id(), - req.futureId(), - req.partition(), - false, - ctx.deploymentEnabled()); - List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion(), stripeIdxs); boolean hasNear = ctx.discovery().cacheNearNode(node, name()); - // Assign next version for update inside entries lock. - //if (ver == null) - - if (hasNear) - res.nearVersion(ver); - if (msgLog.isDebugEnabled()) { msgLog.debug("Assigned update version [futId=" + req.futureId() + ", writeVer=" + ver + ']'); @@ -1959,7 +1943,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { node, hasNear, req, - res, + null, locked, ver, null, @@ -1974,11 +1958,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (retVal == null) retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true); - res.returnValue(retVal); - unlockEntries(locked, null); if (TEST_STRIPE_SUBMIT){ + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + node.id(), + req.futureId(), + req.partition(), + false, + ctx.deploymentEnabled()); + + if (hasNear) + res.nearVersion(ver); + + res.returnValue(retVal); + for (int i = 0; i < req.size(); i++) { fut.addWriteEntry(affinityAssignment, req.key(i), @@ -1997,9 +1991,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb.apply(req, res); } else { - GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res); + if (req.addRes()) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + node.id(), + req.futureId(), + req.partition(), + false, + ctx.deploymentEnabled()); + + if (hasNear) + res.nearVersion(ver); + + res.returnValue(retVal); - if (res0 != null) { for (int i = 0; i < req.size(); i++) { fut.addWriteEntry(affinityAssignment, req.key(i), @@ -2470,7 +2474,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param nearNode Originating node. * @param hasNear {@code True} if originating node has near cache. * @param req Update request. - * @param res Update response. * @param locked Locked entries. * @param ver Assigned update version. * @param dhtFut Optional DHT future. @@ -2672,7 +2675,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (IgniteCheckedException e) { - res.addFailedKey(k, e); + if (res != null) + res.addFailedKey(k, e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index c8e904d..ffae596 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteLogger; @@ -90,10 +91,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa @GridToStringExclude protected byte flags; - /** Response helper. */ - @GridDirectTransient - private NearAtomicResponseHelper responseHelper; - /** * */ @@ -424,21 +421,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa } /** - * @return Response helper. - */ - public NearAtomicResponseHelper responseHelper() { - return responseHelper; - } - - /** - * @param responseHelper Response helper. - */ - public void responseHelper( - NearAtomicResponseHelper responseHelper) { - this.responseHelper = responseHelper; - } - - /** * @param idx Key index. * @return Key. */ @@ -598,6 +580,23 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class); } + private static final AtomicIntegerFieldUpdater<GridNearAtomicAbstractUpdateRequest> UPD = + AtomicIntegerFieldUpdater.newUpdater(GridNearAtomicAbstractUpdateRequest.class, "cnt"); + + /** */ + @GridDirectTransient + private volatile int cnt; + + void setResCount(int cnt) { + this.cnt = cnt; + } + + boolean addRes() { + int c = UPD.decrementAndGet(this); + + return c == 0; + } + /** {@inheritDoc} */ @Override public String toString() { StringBuilder flags = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java deleted file mode 100644 index 55c450c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -/** - * - */ -public class NearAtomicResponseHelper { - - /** */ - private GridNearAtomicUpdateResponse res; - - private static final AtomicIntegerFieldUpdater<NearAtomicResponseHelper> UPD = - AtomicIntegerFieldUpdater.newUpdater(NearAtomicResponseHelper.class, "cnt"); - - /** */ - private volatile int cnt; - - /** - */ - public NearAtomicResponseHelper(int cnt) { - this.cnt = cnt; - } - - /** - * @param res Response. - * @return {@code true} if all responses added. - */ - public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) { - this.res = res; - - int c = UPD.decrementAndGet(this); - - //mergeResponse(res); - - if (c == 0) - return this.res; - - return null; - } - - /** - * @param res Response. - */ - private void mergeResponse(GridNearAtomicUpdateResponse res) { - if (this.res == null) - this.res = res; - else { - if (res.nearValuesIndexes() != null) - for (int i = 0; i < res.nearValuesIndexes().size(); i++) - this.res.addNearValue( - res.nearValuesIndexes().get(i), - res.nearValue(i), - res.nearTtl(i), - res.nearExpireTime(i) - ); - - if (res.failedKeys() != null) - this.res.addFailedKeys(res.failedKeys(), null); - - if (res.skippedIndexes() != null) - this.res.skippedIndexes().addAll(res.skippedIndexes()); - } - } -}