anton-vinogradov commented on a change in pull request #9807:
URL: https://github.com/apache/ignite/pull/9807#discussion_r811709168
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -145,101 +150,94 @@ protected GridNearReadRepairAbstractFuture(
canRemap = topVer == null;
this.topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
topVer;
- }
- /**
- *
- */
- protected final void init() {
- map();
+ Map<ClusterNode, Collection<KeyCacheObject>> mappings = new
HashMap<>();
+
+ for (KeyCacheObject key : keys) {
+ List<ClusterNode> nodes = ctx.affinity().nodesByKey(key,
this.topVer);
+
+ primaries.put(key, nodes.get(0));
+
+ for (ClusterNode node : nodes)
+ mappings.computeIfAbsent(node, k -> new HashSet<>()).add(key);
+ }
+
+ if (mappings.isEmpty())
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map
keys for cache " +
+ "(all partition nodes left the grid) [topVer=" + this.topVer +
", cache=" + ctx.name() + ']'));
+
+ for (Map.Entry<ClusterNode, Collection<KeyCacheObject>> mapping :
mappings.entrySet()) {
+ ClusterNode node = mapping.getKey();
+
+ GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut =
+ new GridPartitionedGetFuture<>(
+ ctx,
+ mapping.getValue(), // Keys.
+ readThrough,
+ false, // Local get required.
+ taskName,
+ deserializeBinary,
+ recovery,
+ expiryPlc,
+ false,
+ true,
+ true,
+ tx != null ? tx.label() : null,
+ tx != null ? tx.mvccSnapshot() : null,
+ node);
+
+ futs.put(mapping.getKey(), fut);
+
+ fut.listen(this::onResult);
+ }
}
/**
*
*/
- private synchronized void map() {
- assert futs.isEmpty() : "Remapping started without the clean-up.";
-
- Map<KeyCacheObject, ClusterNode> primaryNodes = new HashMap<>();
+ protected final void init() {
+ assert !futs.isEmpty();
IgniteInternalTx prevTx = ctx.tm().tx(tx); // Within the original tx.
try {
- Map<ClusterNode, Collection<KeyCacheObject>> mappings = new
HashMap<>();
-
- for (KeyCacheObject key : keys) {
- List<ClusterNode> nodes = ctx.affinity().nodesByKey(key,
topVer);
-
- primaryNodes.put(key, nodes.get(0));
-
- for (ClusterNode node : nodes)
- mappings.computeIfAbsent(node, k -> new
HashSet<>()).add(key);
- }
-
- primaries = primaryNodes;
-
- for (Map.Entry<ClusterNode, Collection<KeyCacheObject>> mapping :
mappings.entrySet()) {
- ClusterNode node = mapping.getKey();
-
- GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut =
- new GridPartitionedGetFuture<>(
- ctx,
- mapping.getValue(), // Keys.
- readThrough,
- false, // Local get required.
- taskName,
- deserializeBinary,
- recovery,
- expiryPlc,
- false,
- true,
- true,
- tx != null ? tx.label() : null,
- tx != null ? tx.mvccSnapshot() : null,
- node);
-
- fut.listen(this::onResult);
-
- futs.put(mapping.getKey(), fut);
- }
-
for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut
: futs.values())
fut.init(topVer);
-
- if (futs.isEmpty())
- onDone(new ClusterTopologyServerNotFoundException("Failed to
map keys for cache " +
- "(all partition nodes left the grid) [topVer=" + topVer +
", cache=" + ctx.name() + ']'));
}
finally {
ctx.tm().tx(prevTx);
}
}
/**
- * @param topVer Topology version.
+ * @param fut Future to be notified.
*/
- protected final void remap(AffinityTopologyVersion topVer) {
- futs.clear();
+ protected final void initOnRemap(GridNearReadRepairAbstractFuture fut) {
+ remapCnt = fut.remapCnt + 1;
+
+ listen(f -> {
+ assert !fut.isDone();
- this.topVer = topVer;
+ fut.onDone(f.result(), f.error());
+ });
- map();
+ init();
}
+ /**
+ * @param topVer Topology version.
+ */
+ protected abstract void remap(AffinityTopologyVersion topVer);
+
/**
* Collects results of each 'get' future and prepares an overall result of
the operation.
*
* @param finished Future represents a result of GET operation.
*/
- protected final synchronized void
onResult(IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> finished) {
- if (isDone() // All subfutures (including currently processing) were
successfully finished at previous future processing.
- || (topVer == null) // Remapping, ignoring any updates until
remapped.
- || !futs.containsValue((GridPartitionedGetFuture<KeyCacheObject,
EntryGetResult>)finished)) // Remapped.
- return;
-
+ protected final void onResult(IgniteInternalFuture<Map<KeyCacheObject,
EntryGetResult>> finished) {
Review comment:
Seems, currently possible to remap the future on each failure.
Future should be remapped only once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]