timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r785913259
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
##########
@@ -105,40 +189,146 @@ public GridNearReadRepairFuture(
else if (compareRes < 0)
fixedMap.put(key, newestRes);
else if (compareRes == 0) {
- CacheObjectAdapter candidateVal =
candidateRes.value();
- CacheObjectAdapter newestVal = newestRes.value();
-
- try {
- byte[] candidateBytes =
candidateVal.valueBytes(ctx.cacheObjectContext());
- byte[] newestBytes =
newestVal.valueBytes(ctx.cacheObjectContext());
+ CacheObject candidateVal = candidateRes.value();
+ CacheObject newestVal = newestRes.value();
- if (!Arrays.equals(candidateBytes,
newestBytes))
- fixedMap.put(key, newestRes); // Same
version, fixing values inconsistency.
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ byte[] candidateBytes =
candidateVal.valueBytes(ctx.cacheObjectContext());
+ byte[] newestBytes =
newestVal.valueBytes(ctx.cacheObjectContext());
- return;
- }
+ if (!Arrays.equals(candidateBytes, newestBytes))
+ irreparableSet.add(key);
}
}
}
else if (newestRes != null)
- fixedMap.put(key, newestRes); // Existing data wins.
+ irreparableSet.add(key); // Impossible to detect latest
between existing and null.
}
}
assert !fixedMap.containsValue(null) : "null should never be
considered as a fix";
- if (!fixedMap.isEmpty()) {
- tx.finishFuture().listen(future -> {
- TransactionState state = tx.state();
+ if (!irreparableSet.isEmpty())
+ throw new IgniteConsistencyViolationException(irreparableSet);
+
+ return fixedMap;
+ }
+
+ /**
+ *
+ */
+ public Map<KeyCacheObject, EntryGetResult>
fixWithPrimary(Collection<KeyCacheObject> inconsistentKeys) {
+ Map<KeyCacheObject, EntryGetResult> fixedMap = new
HashMap<>(inconsistentKeys.size());
+
+ for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut :
futs.values()) {
+ for (KeyCacheObject key : fut.keys()) {
+ if (!inconsistentKeys.contains(key) ||
+ !primaries.get(key).equals(fut.affNode()))
+ continue;
+
+ fixedMap.put(key, fut.result().get(key));
+ }
+ }
+
+ return fixedMap;
+ }
+
+ /**
+ *
+ */
+ public Map<KeyCacheObject, EntryGetResult>
fixWithRemove(Collection<KeyCacheObject> inconsistentKeys) {
+ Map<KeyCacheObject, EntryGetResult> fixedMap = new
HashMap<>(inconsistentKeys.size());
+
+ for (KeyCacheObject key : inconsistentKeys)
+ fixedMap.put(key, null);
+
+ return fixedMap;
+ }
+
+ /**
+ *
+ */
+ public Map<KeyCacheObject, EntryGetResult>
fixWithMajority(Collection<KeyCacheObject> inconsistentKeys)
+ throws IgniteCheckedException {
+ /** */
+ class ByteArrayWrapper {
+ final byte[] arr;
+
+ /** */
+ public ByteArrayWrapper(byte[] arr) {
+ this.arr = arr;
+ }
+
+ /** */
+ @Override public boolean equals(Object o) {
+ return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
+ }
- if (state == TransactionState.COMMITTED) // Explicit tx may
fix the values but become rolled back later.
- recordConsistencyViolation(fixedMap.keySet(), fixedMap);
- });
+ /** */
+ @Override public int hashCode() {
+ return Arrays.hashCode(arr);
+ }
}
- onDone(fixedMap);
+ Set<KeyCacheObject> irreparableSet = new
HashSet<>(inconsistentKeys.size());
+ Map<KeyCacheObject, EntryGetResult> fixedMap = new
HashMap<>(inconsistentKeys.size());
+
+ for (KeyCacheObject inconsistentKey : inconsistentKeys) {
+ Map<T2<ByteArrayWrapper, GridCacheVersion>, T2<EntryGetResult,
Integer>> cntMap = new HashMap<>();
+
+ for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut
: futs.values()) {
+ if (!fut.keys().contains(inconsistentKey))
+ continue;
+
+ EntryGetResult res = fut.result().get(inconsistentKey);
+
+ ByteArrayWrapper wrapped;
+ GridCacheVersion ver;
+
+ if (res != null) {
+ CacheObject val = res.value();
+
+ wrapped = new
ByteArrayWrapper(val.valueBytes(ctx.cacheObjectContext()));
+ ver = res.version();
+ }
+ else {
+ wrapped = new ByteArrayWrapper(null);
+ ver = null;
+ }
+
+ T2<ByteArrayWrapper, GridCacheVersion> keyVer = new
T2<>(wrapped, ver);
+
+ cntMap.putIfAbsent(keyVer, new T2<>(res, 0));
+
+ cntMap.compute(keyVer, (kv, ri) -> new T2<>(ri.getKey(),
ri.getValue() + 1));
+ }
+
+ int[] sorted = cntMap.values().stream()
+ .map(IgniteBiTuple::getValue)
+ .sorted(Comparator.reverseOrder())
+ .mapToInt(v -> v)
+ .toArray();
+
+ int max = sorted[0];
Review comment:
If all nodes didn't send response for the inconsistent key, we will fail
here with unbound exception. Is it possible?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]