This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 12bbc68b8b6 IGNITE-28259 Remove IgniteDhtPartitionHistorySuppliersMap 
(#12959)
12bbc68b8b6 is described below

commit 12bbc68b8b6fe6262597f2035230d88894acba56
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Apr 13 19:41:09 2026 +0500

    IGNITE-28259 Remove IgniteDhtPartitionHistorySuppliersMap (#12959)
---
 .../ignite/internal/CoreMessagesProvider.java      |   2 -
 .../processors/cache/GridCacheIoManager.java       |   2 +-
 .../cache/GridCachePartitionExchangeManager.java   |   6 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java |  42 ++++++--
 .../preloader/GridDhtPartitionsFullMessage.java    |   9 +-
 .../IgniteDhtPartitionHistorySuppliersMap.java     | 119 ---------------------
 .../main/resources/META-INF/classnames.properties  |   1 -
 .../communication/CompressedMessageTest.java       |   9 +-
 8 files changed, 45 insertions(+), 145 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 2996be0a3a4..7a80a30d0ca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -121,7 +121,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -517,7 +516,6 @@ public class CoreMessagesProvider implements 
MessageFactoryProvider {
         withSchema(IgniteDhtDemandedPartitionsMap.class);
         withSchema(CachePartitionFullCountersMap.class);
         withSchema(GroupPartitionIdPair.class);
-        withSchema(IgniteDhtPartitionHistorySuppliersMap.class);
         withSchema(GridPartitionStateMap.class);
         withSchema(GridDhtPartitionMap.class);
         withSchema(GridDhtPartitionFullMap.class);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index ab1001358bd..8d0ffbb168f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -944,7 +944,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             break;
 
-            case 10616:
+            case 10615:
             case 120: {
                 processMessage(nodeId, msg, c); // Will be handled by 
Rebalance Demander.
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c06eeaad393..a8e4981dbef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -87,7 +87,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1348,7 +1348,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
         @Nullable final GridDhtPartitionExchangeId exchId,
         @Nullable GridCacheVersion lastVer,
-        @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+        Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
         @Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload
     ) {
         Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
@@ -1369,7 +1369,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
         @Nullable final GridDhtPartitionExchangeId exchId,
         @Nullable GridCacheVersion lastVer,
-        @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+        @Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
         @Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload,
         Collection<CacheGroupContext> grps
     ) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c269af4923e..3c821355173 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -324,7 +324,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
     /** */
     @GridToStringExclude
-    private final IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = 
new IgniteDhtPartitionHistorySuppliersMap();
+    private Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new 
HashMap<>();
 
     /** Set of nodes that cannot be used for wal rebalancing due to some 
reason. */
     private final Set<UUID> exclusionsFromHistoricalRebalance = 
ConcurrentHashMap.newKeySet();
@@ -587,7 +587,21 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @return List of IDs of history supplier nodes or empty list if these 
doesn't exist.
      */
     public List<UUID> partitionHistorySupplier(int grpId, int partId, long 
cntrSince) {
-        List<UUID> histSuppliers = partHistSuppliers.getSupplier(grpId, 
partId, cntrSince);
+        List<UUID> histSuppliers;
+
+        synchronized (partHistSuppliers) {
+            if (partHistSuppliers.isEmpty())
+                return Collections.emptyList();
+
+            histSuppliers = new ArrayList<>();
+
+            for (Map.Entry<UUID, Map<GroupPartitionIdPair, Long>> e : 
partHistSuppliers.entrySet()) {
+                Long historyCounter = e.getValue().get(new 
GroupPartitionIdPair(grpId, partId));
+
+                if (historyCounter != null && historyCounter <= cntrSince)
+                    histSuppliers.add(e.getKey());
+            }
+        }
 
         histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains);
 
@@ -2436,7 +2450,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             // Create and destroy caches and cache proxies.
             cctx.cache().onExchangeDone(this, err);
 
-            Map<GroupPartitionIdPair, Long> locReserved = 
partHistSuppliers.getReservations(cctx.localNodeId());
+            Map<GroupPartitionIdPair, Long> locReserved;
+
+            synchronized (partHistSuppliers) {
+                locReserved = partHistSuppliers.get(cctx.localNodeId());
+            }
 
             if (locReserved != null) {
                 boolean success = 
cctx.database().reserveHistoryForPreloading(locReserved);
@@ -3544,7 +3562,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 break;
 
             if (preferWalRebalance || maxOwnerCntr - ceilingMinReserved < 
ownerSize) {
-                partHistSuppliers.put(ownerId, grpId, p, ceilingMinReserved);
+                synchronized (partHistSuppliers) {
+                    Map<GroupPartitionIdPair, Long> nodeMap = 
partHistSuppliers.computeIfAbsent(ownerId, k -> new HashMap<>());
+
+                    nodeMap.put(new GroupPartitionIdPair(grpId, p), 
ceilingMinReserved);
+                }
 
                 haveHistory.add(p);
 
@@ -3653,7 +3675,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             assert crd.isLocal();
 
-            assert partHistSuppliers.isEmpty() : partHistSuppliers;
+            synchronized (partHistSuppliers) {
+                assert partHistSuppliers.isEmpty() : partHistSuppliers;
+            }
 
             if (!exchCtx.mergeExchanges() && 
!crd.equals(events().discoveryCache().serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
@@ -4633,10 +4657,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, 
GridDhtPartitionsFullMessage msg) {
         cctx.versions().onExchange(msg.lastVersion().order());
 
-        assert partHistSuppliers.isEmpty();
+        synchronized (partHistSuppliers) {
+            assert partHistSuppliers.isEmpty();
 
-        partHistSuppliers.putAll(msg.partitionHistorySuppliers() != null ? 
msg.partitionHistorySuppliers() :
-            IgniteDhtPartitionHistorySuppliersMap.empty());
+            if (msg.partitionHistorySuppliers() != null)
+                partHistSuppliers = msg.partitionHistorySuppliers();
+        }
 
         // Reserve at least 2 threads for system operations.
         int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), 
GridIoPolicy.SYSTEM_POOL, 2);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 84716df72f2..535ed0ac240 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -76,7 +76,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     @Order(3)
     @Compress
     @GridToStringInclude
-    IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
+    Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers;
 
     /** Partitions that must be cleared and re-loaded. */
     @Order(4)
@@ -143,7 +143,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId 
id,
         @Nullable GridCacheVersion lastVer,
         @NotNull AffinityTopologyVersion topVer,
-        @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+        @Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
         @Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload) {
         super(id, lastVer);
 
@@ -326,7 +326,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     /**
      * @return Partitions history suppliers.
      */
-    public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() {
+    public Map<UUID, Map<GroupPartitionIdPair, Long>> 
partitionHistorySuppliers() {
         return partHistSuppliers;
     }
 
@@ -444,9 +444,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         if (parts == null)
             parts = new HashMap<>();
 
-        if (partHistSuppliers == null)
-            partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
-
         errs = errMsgs == null ? null : F.viewReadOnly(errMsgs, e -> 
e.error());
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
deleted file mode 100644
index 77e7dcb1f06..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class IgniteDhtPartitionHistorySuppliersMap implements Message {
-    /** */
-    private static final IgniteDhtPartitionHistorySuppliersMap EMPTY = new 
IgniteDhtPartitionHistorySuppliersMap();
-
-    /** */
-    @Order(0)
-    Map<UUID, Map<GroupPartitionIdPair, Long>> map;
-
-    /**
-     * @return Empty map.
-     */
-    public static IgniteDhtPartitionHistorySuppliersMap empty() {
-        return EMPTY;
-    }
-
-    /**
-     * @param grpId Group ID.
-     * @param partId Partition ID.
-     * @param cntrSince Partition update counter since history supplying is 
requested.
-     * @return List of supplier UUIDs or empty list if haven't these.
-     */
-    public synchronized List<UUID> getSupplier(int grpId, int partId, long 
cntrSince) {
-        if (map == null)
-            return Collections.emptyList();
-
-        List<UUID> suppliers = new ArrayList<>();
-
-        for (Map.Entry<UUID, Map<GroupPartitionIdPair, Long>> e : 
map.entrySet()) {
-            UUID supplierNode = e.getKey();
-
-            Long historyCounter = e.getValue().get(new 
GroupPartitionIdPair(grpId, partId));
-
-            if (historyCounter != null && historyCounter <= cntrSince)
-                suppliers.add(supplierNode);
-        }
-
-        return suppliers;
-    }
-
-    /**
-     * @param nodeId Node ID to check.
-     * @return Reservations for the given node.
-     */
-    @Nullable public synchronized Map<GroupPartitionIdPair, Long> 
getReservations(UUID nodeId) {
-        if (map == null)
-            return null;
-
-        return map.get(nodeId);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param grpId Cache group ID.
-     * @param partId Partition ID.
-     * @param cntr Partition counter.
-     */
-    public synchronized void put(UUID nodeId, int grpId, int partId, long 
cntr) {
-        if (map == null)
-            map = new HashMap<>();
-
-        Map<GroupPartitionIdPair, Long> nodeMap = map.computeIfAbsent(nodeId, 
k -> new HashMap<>());
-
-        nodeMap.put(new GroupPartitionIdPair(grpId, partId), cntr);
-    }
-
-    /**
-     * @return {@code True} if empty.
-     */
-    public synchronized boolean isEmpty() {
-        return map == null || map.isEmpty();
-    }
-
-    /**
-     * @param that Other map to put.
-     */
-    public synchronized void putAll(IgniteDhtPartitionHistorySuppliersMap 
that) {
-        map = that.map;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteDhtPartitionHistorySuppliersMap.class, this);
-    }
-
-}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 53f96dcb337..f4dd2f86118 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1191,7 +1191,6 @@ 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$2
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap
-org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException
 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
index 05b699a31ef..53fa342460b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
@@ -30,7 +30,6 @@ import 
org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -108,13 +107,13 @@ public class CompressedMessageTest {
 
     /** */
     private GridDhtPartitionsFullMessage fullMessage() {
-        IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new 
IgniteDhtPartitionHistorySuppliersMap();
+        Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new 
HashMap<>();
         Map<UUID, Map<Integer, Set<Integer>>> partsToReload = new HashMap<>();
 
         for (int i = 0; i < 500; i++) {
             UUID uuid = UUID.randomUUID();
 
-            partHistSuppliers.put(uuid, i, i + 1, i + 2);
+            partHistSuppliers.put(uuid, Map.of(new GroupPartitionIdPair(i, i + 
1), i + 2L));
             partsToReload.put(uuid, Map.of(i, Set.of(i + 1)));
         }
 
@@ -123,8 +122,8 @@ public class CompressedMessageTest {
 
     /** */
     private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, 
GridDhtPartitionsFullMessage actual) {
-        Map<UUID, Map<GroupPartitionIdPair, Long>> expHistSuppliers = 
U.field(expected.partitionHistorySuppliers(), "map");
-        Map<UUID, Map<GroupPartitionIdPair, Long>> actHistSuppliers = 
U.field(actual.partitionHistorySuppliers(), "map");
+        Map<UUID, Map<GroupPartitionIdPair, Long>> expHistSuppliers = 
expected.partitionHistorySuppliers();
+        Map<UUID, Map<GroupPartitionIdPair, Long>> actHistSuppliers = 
actual.partitionHistorySuppliers();
 
         assertEquals(expHistSuppliers.size(), actHistSuppliers.size());
 

Reply via email to