IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9dec3b7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9dec3b7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9dec3b7b Branch: refs/heads/ignite-104 Commit: 9dec3b7b1738a23d69b9b28ff24ea781693b323f Parents: 67572e5 Author: Valentin Kulichenko <[email protected]> Authored: Tue Jul 21 19:57:26 2015 -0700 Committer: Valentin Kulichenko <[email protected]> Committed: Tue Jul 21 19:57:26 2015 -0700 ---------------------------------------------------------------------- .../dht/atomic/GridAtomicMappingKey.java | 86 +++++++++++++++ .../dht/atomic/GridDhtAtomicUpdateFuture.java | 84 ++++----------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 106 +++++-------------- 3 files changed, 133 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9dec3b7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java new file mode 100644 index 0000000..52e3c7f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Mapping Key. + */ +class GridAtomicMappingKey { + /** Node ID. */ + private final UUID nodeId; + + /** Partition. */ + private final int part; + + /** + * @param nodeId Node ID. + * @param part Partition. + */ + GridAtomicMappingKey(UUID nodeId, int part) { + assert nodeId != null; + assert part >= -1 : part; + + this.nodeId = nodeId; + this.part = part; + } + + /** + * @return Node ID. + */ + UUID nodeId() { + return nodeId; + } + + /** + * @return Partition. + */ + int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + GridAtomicMappingKey key = (GridAtomicMappingKey)o; + + return nodeId.equals(key.nodeId) && part == key.part; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + part; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridAtomicMappingKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9dec3b7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 7100d3d..23b2161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Mappings. */ @GridToStringInclude - private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); + private ConcurrentMap<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); /** Entries with readers. */ private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; @@ -135,9 +135,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() { - @Override public ClusterNode apply(MappingKey mappingKey) { - return cctx.kernalContext().discovery().node(mappingKey.nodeId); + return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() { + @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) { + return cctx.kernalContext().discovery().node(mappingKey.nodeId()); } }), F.notNull()); } @@ -147,15 +147,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (log.isDebugEnabled()) log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); - Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size()); + Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size()); - for (MappingKey mappingKey : mappings.keySet()) { - if (mappingKey.nodeId.equals(nodeId)) + for (GridAtomicMappingKey mappingKey : mappings.keySet()) { + if (mappingKey.nodeId().equals(nodeId)) mappingKeys.add(mappingKey); } if (!mappingKeys.isEmpty()) { - for (MappingKey mappingKey : mappingKeys) + for (GridAtomicMappingKey mappingKey : mappingKeys) mappings.remove(mappingKey); checkComplete(); @@ -227,7 +227,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> for (ClusterNode node : dhtNodes) { UUID nodeId = node.id(); - MappingKey mappingKey = new MappingKey(nodeId, part); + GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); if (!nodeId.equals(cctx.localNodeId())) { GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey); @@ -282,7 +282,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1; for (UUID nodeId : readers) { - MappingKey mappingKey = new MappingKey(nodeId, part); + GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey); @@ -341,22 +341,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> */ public void map() { if (!mappings.isEmpty()) { - for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) { - MappingKey mappingKey = e.getKey(); + for (Map.Entry<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) { + GridAtomicMappingKey mappingKey = e.getKey(); GridDhtAtomicUpdateRequest req = e.getValue(); try { if (log.isDebugEnabled()) log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - if (mappingKey.part >= 0) { - Object topic = CU.partitionMessageTopic(cctx, mappingKey.part, false); + if (mappingKey.partition() >= 0) { + Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), false); - cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), + cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(), 2 * cctx.gridConfig().getNetworkTimeout()); } else { - assert mappingKey.part == -1; + assert mappingKey.partition() == -1; cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } @@ -411,7 +411,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } } - mappings.remove(new MappingKey(nodeId, updateRes.partition())); + mappings.remove(new GridAtomicMappingKey(nodeId, updateRes.partition())); checkComplete(); } @@ -427,7 +427,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); for (Integer part : res.partitions()) - mappings.remove(new MappingKey(nodeId, part)); + mappings.remove(new GridAtomicMappingKey(nodeId, part)); checkComplete(); } @@ -450,52 +450,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> return S.toString(GridDhtAtomicUpdateFuture.class, this); } - /** - */ - private static class MappingKey { - /** Node ID. */ - private final UUID nodeId; - - /** Partition. */ - private final int part; - - /** - * @param nodeId Node ID. - * @param part Partition. - */ - private MappingKey(UUID nodeId, int part) { - assert nodeId != null; - assert part >= -1 : part; - - this.nodeId = nodeId; - this.part = part; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - MappingKey key = (MappingKey)o; - - return nodeId.equals(key.nodeId) && part == key.part; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + part; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MappingKey.class, this); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9dec3b7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ef3a18b..9b2a5e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Mappings. */ @GridToStringInclude - private ConcurrentMap<MappingKey, GridNearAtomicUpdateRequest> mappings; + private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings; /** Error. */ private volatile CachePartialUpdateCheckedException err; @@ -246,9 +246,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** {@inheritDoc} */ @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() { - @Override public ClusterNode apply(MappingKey mappingKey) { - return cctx.kernalContext().discovery().node(mappingKey.nodeId); + return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() { + @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) { + return cctx.kernalContext().discovery().node(mappingKey.nodeId()); } }), F.notNull()); } @@ -287,11 +287,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return false; } - Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size()); + Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size()); Collection<KeyCacheObject> failedKeys = new ArrayList<>(); - for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { - if (e.getKey().nodeId.equals(nodeId)) { + for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + if (e.getKey().nodeId().equals(nodeId)) { mappingKeys.add(e.getKey()); failedKeys.addAll(e.getValue().keys()); @@ -303,7 +303,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " + "response is received: " + nodeId)); - for (MappingKey key : mappingKeys) + for (GridAtomicMappingKey key : mappingKeys) mappings.remove(key); checkComplete(); @@ -544,7 +544,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } } else { - MappingKey mappingKey = new MappingKey(nodeId, res.partition()); + GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, res.partition()); GridNearAtomicUpdateRequest req = mappings.get(mappingKey); @@ -827,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } // Optimize mapping for single key. - mapSingle(new MappingKey(primary.id(), part), req); + mapSingle(new GridAtomicMappingKey(primary.id(), part), req); return; } @@ -847,15 +847,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (conflictRmvVals != null) conflictRmvValsIt = conflictRmvVals.iterator(); - Map<MappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); + Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. synchronized (this) { if (oldNodeId != null) { // TODO: IGNITE-104 - Try to avoid iteration. - for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { - if (e.getKey().nodeId.equals(oldNodeId)) + for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + if (e.getKey().nodeId().equals(oldNodeId)) mappings.remove(e.getKey()); } } @@ -952,7 +952,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> UUID nodeId = affNode.id(); - MappingKey mappingKey = new MappingKey(nodeId, part); + GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey); @@ -997,7 +997,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } if ((single == null || single) && pendingMappings.size() == 1) { - Map.Entry<MappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); + Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet()); single = true; @@ -1043,12 +1043,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param mappingKey Mapping key. * @param req Request. */ - private void mapSingle(MappingKey mappingKey, GridNearAtomicUpdateRequest req) { - singleNodeId = mappingKey.nodeId; + private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) { + singleNodeId = mappingKey.nodeId(); singleReq = req; - if (cctx.localNodeId().equals(mappingKey.nodeId)) { - cache.updateAllAsyncInternal(mappingKey.nodeId, req, + if (cctx.localNodeId().equals(mappingKey.nodeId())) { + cache.updateAllAsyncInternal(mappingKey.nodeId(), req, new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { @@ -1079,14 +1079,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * * @param mappings Mappings to send. */ - private void doUpdate(Map<MappingKey, GridNearAtomicUpdateRequest> mappings) { + private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) { UUID locNodeId = cctx.localNodeId(); Collection<GridNearAtomicUpdateRequest> locUpdates = null; // Send messages to remote nodes first, then run local update. - for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { - MappingKey mappingKey = e.getKey(); + for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) { + GridAtomicMappingKey mappingKey = e.getKey(); GridNearAtomicUpdateRequest req = e.getValue(); if (locNodeId.equals(req.nodeId())) { @@ -1141,15 +1141,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param req Update request. * @throws IgniteCheckedException In case of error. */ - private void sendRequest(MappingKey mappingKey, GridNearAtomicUpdateRequest req) throws IgniteCheckedException { - if (mappingKey.part >= 0) { - Object topic = CU.partitionMessageTopic(cctx, mappingKey.part, true); + private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) + throws IgniteCheckedException { + if (mappingKey.partition() >= 0) { + Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), true); - cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), + cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(), 2 * cctx.gridConfig().getNetworkTimeout()); } else { - assert mappingKey.part == -1; + assert mappingKey.partition() == -1; cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } @@ -1160,7 +1161,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * * @param mappingKey Mapping key. */ - private void removeMapping(MappingKey mappingKey) { + private void removeMapping(GridAtomicMappingKey mappingKey) { mappings.remove(mappingKey); } @@ -1205,53 +1206,4 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> public String toString() { return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString()); } - - /** - */ - private static class MappingKey { - /** Node ID. */ - private final UUID nodeId; - - /** Partition. */ - private final int part; - - /** - * @param nodeId Node ID. - * @param part Partition. - */ - private MappingKey(UUID nodeId, int part) { - assert nodeId != null; - assert part >= -1 : part; - - this.nodeId = nodeId; - this.part = part; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - MappingKey key = (MappingKey)o; - - return nodeId.equals(key.nodeId) && part == key.part; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + part; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MappingKey.class, this); - } - } }
