timoninmaxim commented on a change in pull request #9661:
URL: https://github.com/apache/ignite/pull/9661#discussion_r785913259



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
##########
@@ -105,40 +189,146 @@ public GridNearReadRepairFuture(
                         else if (compareRes < 0)
                             fixedMap.put(key, newestRes);
                         else if (compareRes == 0) {
-                            CacheObjectAdapter candidateVal = 
candidateRes.value();
-                            CacheObjectAdapter newestVal = newestRes.value();
-
-                            try {
-                                byte[] candidateBytes = 
candidateVal.valueBytes(ctx.cacheObjectContext());
-                                byte[] newestBytes = 
newestVal.valueBytes(ctx.cacheObjectContext());
+                            CacheObject candidateVal = candidateRes.value();
+                            CacheObject newestVal = newestRes.value();
 
-                                if (!Arrays.equals(candidateBytes, 
newestBytes))
-                                    fixedMap.put(key, newestRes); // Same 
version, fixing values inconsistency.
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
+                            byte[] candidateBytes = 
candidateVal.valueBytes(ctx.cacheObjectContext());
+                            byte[] newestBytes = 
newestVal.valueBytes(ctx.cacheObjectContext());
 
-                                return;
-                            }
+                            if (!Arrays.equals(candidateBytes, newestBytes))
+                                irreparableSet.add(key);
                         }
                     }
                 }
                 else if (newestRes != null)
-                    fixedMap.put(key, newestRes); // Existing data wins.
+                    irreparableSet.add(key); // Impossible to detect latest 
between existing and null.
             }
         }
 
         assert !fixedMap.containsValue(null) : "null should never be 
considered as a fix";
 
-        if (!fixedMap.isEmpty()) {
-            tx.finishFuture().listen(future -> {
-                TransactionState state = tx.state();
+        if (!irreparableSet.isEmpty())
+            throw new IgniteConsistencyViolationException(irreparableSet);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> 
fixWithPrimary(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new 
HashMap<>(inconsistentKeys.size());
+
+        for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : 
futs.values()) {
+            for (KeyCacheObject key : fut.keys()) {
+                if (!inconsistentKeys.contains(key) ||
+                    !primaries.get(key).equals(fut.affNode()))
+                    continue;
+
+                fixedMap.put(key, fut.result().get(key));
+            }
+        }
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> 
fixWithRemove(Collection<KeyCacheObject> inconsistentKeys) {
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new 
HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject key : inconsistentKeys)
+            fixedMap.put(key, null);
+
+        return fixedMap;
+    }
+
+    /**
+     *
+     */
+    public Map<KeyCacheObject, EntryGetResult> 
fixWithMajority(Collection<KeyCacheObject> inconsistentKeys)
+        throws IgniteCheckedException {
+        /** */
+        class ByteArrayWrapper {
+            final byte[] arr;
+
+            /** */
+            public ByteArrayWrapper(byte[] arr) {
+                this.arr = arr;
+            }
+
+            /** */
+            @Override public boolean equals(Object o) {
+                return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
+            }
 
-                if (state == TransactionState.COMMITTED) // Explicit tx may 
fix the values but become rolled back later.
-                    recordConsistencyViolation(fixedMap.keySet(), fixedMap);
-            });
+            /** */
+            @Override public int hashCode() {
+                return Arrays.hashCode(arr);
+            }
         }
 
-        onDone(fixedMap);
+        Set<KeyCacheObject> irreparableSet = new 
HashSet<>(inconsistentKeys.size());
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new 
HashMap<>(inconsistentKeys.size());
+
+        for (KeyCacheObject inconsistentKey : inconsistentKeys) {
+            Map<T2<ByteArrayWrapper, GridCacheVersion>, T2<EntryGetResult, 
Integer>> cntMap = new HashMap<>();
+
+            for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut 
: futs.values()) {
+                if (!fut.keys().contains(inconsistentKey))
+                    continue;
+
+                EntryGetResult res = fut.result().get(inconsistentKey);
+
+                ByteArrayWrapper wrapped;
+                GridCacheVersion ver;
+
+                if (res != null) {
+                    CacheObject val = res.value();
+
+                    wrapped = new 
ByteArrayWrapper(val.valueBytes(ctx.cacheObjectContext()));
+                    ver = res.version();
+                }
+                else {
+                    wrapped = new ByteArrayWrapper(null);
+                    ver = null;
+                }
+
+                T2<ByteArrayWrapper, GridCacheVersion> keyVer = new 
T2<>(wrapped, ver);
+
+                cntMap.putIfAbsent(keyVer, new T2<>(res, 0));
+
+                cntMap.compute(keyVer, (kv, ri) -> new T2<>(ri.getKey(), 
ri.getValue() + 1));
+            }
+
+            int[] sorted = cntMap.values().stream()
+                .map(IgniteBiTuple::getValue)
+                .sorted(Comparator.reverseOrder())
+                .mapToInt(v -> v)
+                .toArray();
+
+            int max = sorted[0];

Review comment:
       If all nodes didn't send response for the inconsistent key, we will fail 
here with unbound exception. Is it possible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to