This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 87019581cf IGNITE-20379 Concurrency issues with write intent tracking (#2562) 87019581cf is described below commit 87019581cff56a0c4611a6fc43af1297b863ec5f Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Mon Sep 11 10:35:36 2023 +0300 IGNITE-20379 Concurrency issues with write intent tracking (#2562) --- .../rpc/impl/RaftGroupEventsClientListener.java | 20 ++++---------------- .../table/distributed/StorageUpdateHandler.java | 12 +++--------- .../table/distributed/replicator/PendingRows.java | 18 +++++------------- 3 files changed, 12 insertions(+), 38 deletions(-) diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java index e3685b8085..8455f78d71 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java @@ -17,10 +17,10 @@ package org.apache.ignite.raft.jraft.rpc.impl; -import java.util.ArrayList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.LeaderElectionListener; @@ -42,15 +42,7 @@ public class RaftGroupEventsClientListener { * @param listener Listener. */ public void addLeaderElectionListener(ReplicationGroupId groupId, LeaderElectionListener listener) { - leaderElectionListeners.compute(groupId, (k, listeners) -> { - if (listeners == null) { - listeners = new ArrayList<>(); - } - - listeners.add(listener); - - return listeners; - }); + leaderElectionListeners.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(listener); } /** @@ -60,13 +52,9 @@ public class RaftGroupEventsClientListener { * @param listener Listener. */ public void removeLeaderElectionListener(ReplicationGroupId groupId, LeaderElectionListener listener) { - leaderElectionListeners.compute(groupId, (k, listeners) -> { - if (listeners == null) { - return null; - } - + leaderElectionListeners.computeIfPresent(groupId, (k, listeners) -> { listeners.remove(listener); - + return listeners; }); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index d53455a4ca..cd6e8518fb 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -265,9 +265,7 @@ public class StorageUpdateHandler { * @param commitTimestamp Commit timestamp. Not {@code null} if {@code commit} is {@code true}. */ public void handleTransactionCleanup(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) { - Set<RowId> pendingRowIds = pendingRows.getPendingRowIds(txId); - - handleTransactionCleanup(pendingRowIds, commit, commitTimestamp, () -> pendingRows.removePendingRowIds(txId)); + handleTransactionCleanup(txId, commit, commitTimestamp, () -> {}); } /** @@ -280,13 +278,9 @@ public class StorageUpdateHandler { */ public void handleTransactionCleanup(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, Runnable onApplication) { - Set<RowId> pendingRowIds = pendingRows.getPendingRowIds(txId); - - handleTransactionCleanup(pendingRowIds, commit, commitTimestamp, () -> { - pendingRows.removePendingRowIds(txId); + Set<RowId> pendingRowIds = pendingRows.removePendingRowIds(txId); - onApplication.run(); - }); + handleTransactionCleanup(pendingRowIds, commit, commitTimestamp, onApplication); } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java index 55411b0b8a..110d45144d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java @@ -77,23 +77,15 @@ public class PendingRows { } /** - * Removes all row IDs for the given transaction. + * Removes all pending row IDs for the given transaction. * * @param txId Transaction ID. + * @return Pending row IDs mapped to the provided transaction or an empty set if there were none. */ - public void removePendingRowIds(UUID txId) { - txsPendingRowIds.remove(txId); - } + public Set<RowId> removePendingRowIds(UUID txId) { + Set<RowId> pendingRows = txsPendingRowIds.remove(txId); - /** - * Returns pending row IDs for the given transaction or an empty set if there are no pending rows. - * - * @param txId Transaction ID. - * @return Pending row IDs. - */ - public Set<RowId> getPendingRowIds(UUID txId) { - return txsPendingRowIds.getOrDefault(txId, EMPTY_SET); + return pendingRows == null ? EMPTY_SET : pendingRows; } - }