This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 12bbc68b8b6 IGNITE-28259 Remove IgniteDhtPartitionHistorySuppliersMap
(#12959)
12bbc68b8b6 is described below
commit 12bbc68b8b6fe6262597f2035230d88894acba56
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Apr 13 19:41:09 2026 +0500
IGNITE-28259 Remove IgniteDhtPartitionHistorySuppliersMap (#12959)
---
.../ignite/internal/CoreMessagesProvider.java | 2 -
.../processors/cache/GridCacheIoManager.java | 2 +-
.../cache/GridCachePartitionExchangeManager.java | 6 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 42 ++++++--
.../preloader/GridDhtPartitionsFullMessage.java | 9 +-
.../IgniteDhtPartitionHistorySuppliersMap.java | 119 ---------------------
.../main/resources/META-INF/classnames.properties | 1 -
.../communication/CompressedMessageTest.java | 9 +-
8 files changed, 45 insertions(+), 145 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 2996be0a3a4..7a80a30d0ca 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -121,7 +121,6 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -517,7 +516,6 @@ public class CoreMessagesProvider implements
MessageFactoryProvider {
withSchema(IgniteDhtDemandedPartitionsMap.class);
withSchema(CachePartitionFullCountersMap.class);
withSchema(GroupPartitionIdPair.class);
- withSchema(IgniteDhtPartitionHistorySuppliersMap.class);
withSchema(GridPartitionStateMap.class);
withSchema(GridDhtPartitionMap.class);
withSchema(GridDhtPartitionFullMap.class);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index ab1001358bd..8d0ffbb168f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -944,7 +944,7 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
break;
- case 10616:
+ case 10615:
case 120: {
processMessage(nodeId, msg, c); // Will be handled by
Rebalance Demander.
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c06eeaad393..a8e4981dbef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -87,7 +87,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
@@ -1348,7 +1348,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
- @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+ Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload
) {
Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
@@ -1369,7 +1369,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
- @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+ @Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload,
Collection<CacheGroupContext> grps
) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c269af4923e..3c821355173 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -324,7 +324,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
/** */
@GridToStringExclude
- private final IgniteDhtPartitionHistorySuppliersMap partHistSuppliers =
new IgniteDhtPartitionHistorySuppliersMap();
+ private Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new
HashMap<>();
/** Set of nodes that cannot be used for wal rebalancing due to some
reason. */
private final Set<UUID> exclusionsFromHistoricalRebalance =
ConcurrentHashMap.newKeySet();
@@ -587,7 +587,21 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
* @return List of IDs of history supplier nodes or empty list if these
doesn't exist.
*/
public List<UUID> partitionHistorySupplier(int grpId, int partId, long
cntrSince) {
- List<UUID> histSuppliers = partHistSuppliers.getSupplier(grpId,
partId, cntrSince);
+ List<UUID> histSuppliers;
+
+ synchronized (partHistSuppliers) {
+ if (partHistSuppliers.isEmpty())
+ return Collections.emptyList();
+
+ histSuppliers = new ArrayList<>();
+
+ for (Map.Entry<UUID, Map<GroupPartitionIdPair, Long>> e :
partHistSuppliers.entrySet()) {
+ Long historyCounter = e.getValue().get(new
GroupPartitionIdPair(grpId, partId));
+
+ if (historyCounter != null && historyCounter <= cntrSince)
+ histSuppliers.add(e.getKey());
+ }
+ }
histSuppliers.removeIf(exclusionsFromHistoricalRebalance::contains);
@@ -2436,7 +2450,11 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
// Create and destroy caches and cache proxies.
cctx.cache().onExchangeDone(this, err);
- Map<GroupPartitionIdPair, Long> locReserved =
partHistSuppliers.getReservations(cctx.localNodeId());
+ Map<GroupPartitionIdPair, Long> locReserved;
+
+ synchronized (partHistSuppliers) {
+ locReserved = partHistSuppliers.get(cctx.localNodeId());
+ }
if (locReserved != null) {
boolean success =
cctx.database().reserveHistoryForPreloading(locReserved);
@@ -3544,7 +3562,11 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
break;
if (preferWalRebalance || maxOwnerCntr - ceilingMinReserved <
ownerSize) {
- partHistSuppliers.put(ownerId, grpId, p, ceilingMinReserved);
+ synchronized (partHistSuppliers) {
+ Map<GroupPartitionIdPair, Long> nodeMap =
partHistSuppliers.computeIfAbsent(ownerId, k -> new HashMap<>());
+
+ nodeMap.put(new GroupPartitionIdPair(grpId, p),
ceilingMinReserved);
+ }
haveHistory.add(p);
@@ -3653,7 +3675,9 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
assert crd.isLocal();
- assert partHistSuppliers.isEmpty() : partHistSuppliers;
+ synchronized (partHistSuppliers) {
+ assert partHistSuppliers.isEmpty() : partHistSuppliers;
+ }
if (!exchCtx.mergeExchanges() &&
!crd.equals(events().discoveryCache().serverNodes().get(0))) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
@@ -4633,10 +4657,12 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
private void updatePartitionFullMap(AffinityTopologyVersion resTopVer,
GridDhtPartitionsFullMessage msg) {
cctx.versions().onExchange(msg.lastVersion().order());
- assert partHistSuppliers.isEmpty();
+ synchronized (partHistSuppliers) {
+ assert partHistSuppliers.isEmpty();
- partHistSuppliers.putAll(msg.partitionHistorySuppliers() != null ?
msg.partitionHistorySuppliers() :
- IgniteDhtPartitionHistorySuppliersMap.empty());
+ if (msg.partitionHistorySuppliers() != null)
+ partHistSuppliers = msg.partitionHistorySuppliers();
+ }
// Reserve at least 2 threads for system operations.
int parallelismLvl = U.availableThreadCount(cctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 84716df72f2..535ed0ac240 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -76,7 +76,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
@Order(3)
@Compress
@GridToStringInclude
- IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
+ Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers;
/** Partitions that must be cleared and re-loaded. */
@Order(4)
@@ -143,7 +143,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId
id,
@Nullable GridCacheVersion lastVer,
@NotNull AffinityTopologyVersion topVer,
- @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+ @Nullable Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers,
@Nullable Map<UUID, Map<Integer, Set<Integer>>> partsToReload) {
super(id, lastVer);
@@ -326,7 +326,7 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/**
* @return Partitions history suppliers.
*/
- public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() {
+ public Map<UUID, Map<GroupPartitionIdPair, Long>>
partitionHistorySuppliers() {
return partHistSuppliers;
}
@@ -444,9 +444,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
if (parts == null)
parts = new HashMap<>();
- if (partHistSuppliers == null)
- partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
-
errs = errMsgs == null ? null : F.viewReadOnly(errMsgs, e ->
e.error());
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
deleted file mode 100644
index 77e7dcb1f06..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class IgniteDhtPartitionHistorySuppliersMap implements Message {
- /** */
- private static final IgniteDhtPartitionHistorySuppliersMap EMPTY = new
IgniteDhtPartitionHistorySuppliersMap();
-
- /** */
- @Order(0)
- Map<UUID, Map<GroupPartitionIdPair, Long>> map;
-
- /**
- * @return Empty map.
- */
- public static IgniteDhtPartitionHistorySuppliersMap empty() {
- return EMPTY;
- }
-
- /**
- * @param grpId Group ID.
- * @param partId Partition ID.
- * @param cntrSince Partition update counter since history supplying is
requested.
- * @return List of supplier UUIDs or empty list if haven't these.
- */
- public synchronized List<UUID> getSupplier(int grpId, int partId, long
cntrSince) {
- if (map == null)
- return Collections.emptyList();
-
- List<UUID> suppliers = new ArrayList<>();
-
- for (Map.Entry<UUID, Map<GroupPartitionIdPair, Long>> e :
map.entrySet()) {
- UUID supplierNode = e.getKey();
-
- Long historyCounter = e.getValue().get(new
GroupPartitionIdPair(grpId, partId));
-
- if (historyCounter != null && historyCounter <= cntrSince)
- suppliers.add(supplierNode);
- }
-
- return suppliers;
- }
-
- /**
- * @param nodeId Node ID to check.
- * @return Reservations for the given node.
- */
- @Nullable public synchronized Map<GroupPartitionIdPair, Long>
getReservations(UUID nodeId) {
- if (map == null)
- return null;
-
- return map.get(nodeId);
- }
-
- /**
- * @param nodeId Node ID.
- * @param grpId Cache group ID.
- * @param partId Partition ID.
- * @param cntr Partition counter.
- */
- public synchronized void put(UUID nodeId, int grpId, int partId, long
cntr) {
- if (map == null)
- map = new HashMap<>();
-
- Map<GroupPartitionIdPair, Long> nodeMap = map.computeIfAbsent(nodeId,
k -> new HashMap<>());
-
- nodeMap.put(new GroupPartitionIdPair(grpId, partId), cntr);
- }
-
- /**
- * @return {@code True} if empty.
- */
- public synchronized boolean isEmpty() {
- return map == null || map.isEmpty();
- }
-
- /**
- * @param that Other map to put.
- */
- public synchronized void putAll(IgniteDhtPartitionHistorySuppliersMap
that) {
- map = that.map;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteDhtPartitionHistorySuppliersMap.class, this);
- }
-
-}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 53f96dcb337..f4dd2f86118 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1191,7 +1191,6 @@
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPre
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$2
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap
-org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
index 05b699a31ef..53fa342460b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
@@ -30,7 +30,6 @@ import
org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GroupPartitionIdPair;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -108,13 +107,13 @@ public class CompressedMessageTest {
/** */
private GridDhtPartitionsFullMessage fullMessage() {
- IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new
IgniteDhtPartitionHistorySuppliersMap();
+ Map<UUID, Map<GroupPartitionIdPair, Long>> partHistSuppliers = new
HashMap<>();
Map<UUID, Map<Integer, Set<Integer>>> partsToReload = new HashMap<>();
for (int i = 0; i < 500; i++) {
UUID uuid = UUID.randomUUID();
- partHistSuppliers.put(uuid, i, i + 1, i + 2);
+ partHistSuppliers.put(uuid, Map.of(new GroupPartitionIdPair(i, i +
1), i + 2L));
partsToReload.put(uuid, Map.of(i, Set.of(i + 1)));
}
@@ -123,8 +122,8 @@ public class CompressedMessageTest {
/** */
private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected,
GridDhtPartitionsFullMessage actual) {
- Map<UUID, Map<GroupPartitionIdPair, Long>> expHistSuppliers =
U.field(expected.partitionHistorySuppliers(), "map");
- Map<UUID, Map<GroupPartitionIdPair, Long>> actHistSuppliers =
U.field(actual.partitionHistorySuppliers(), "map");
+ Map<UUID, Map<GroupPartitionIdPair, Long>> expHistSuppliers =
expected.partitionHistorySuppliers();
+ Map<UUID, Map<GroupPartitionIdPair, Long>> actHistSuppliers =
actual.partitionHistorySuppliers();
assertEquals(expHistSuppliers.size(), actHistSuppliers.size());