This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d3044b5b8cc IGNITE-28455 Remove volatile node names and IDs (#7930)
d3044b5b8cc is described below
commit d3044b5b8cca7383872a45c33ed477d632e42045
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Apr 6 11:26:13 2026 +0300
IGNITE-28455 Remove volatile node names and IDs (#7930)
---
.../rebalance/ItRebalanceDistributedTest.java | 3 +-
.../partition/replicator/fixtures/Node.java | 3 +-
.../ignite/internal/replicator/ReplicaManager.java | 34 ++++++++++------------
.../internal/replicator/ReplicaStateManager.java | 7 +++--
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 ++---
.../internal/table/distributed/TableManager.java | 17 +++++------
.../distributed/TableManagerRecoveryTest.java | 3 +-
.../table/distributed/TableManagerTest.java | 7 +----
.../ignite/internal/tx/impl/TxManagerImpl.java | 23 ++++++---------
10 files changed, 43 insertions(+), 64 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 71e4a3f14c4..abbcb4e377a 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1564,13 +1564,12 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
);
tableManager = new TableManager(
- name,
+ clusterService.staticLocalNode(),
registry,
gcConfig,
replicationConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
- clusterService.staticLocalNode(),
mock(LockManager.class),
replicaSvc,
txManager,
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index c1cc100885f..529af9b7eb5 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -769,13 +769,12 @@ public class Node {
);
tableManager = new TableManager(
- name,
+ clusterService.staticLocalNode(),
registry,
gcConfiguration,
replicationConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
- clusterService.staticLocalNode(),
lockManager,
replicaSvc,
txManager,
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index a9cf438b1f9..8ff7610e762 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -50,7 +50,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -177,6 +176,9 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/** Cluster network service. */
private final ClusterService clusterNetSvc;
+ /** Local node. */
+ private final InternalClusterNode localNode;
+
/** Cluster group manager. */
private final ClusterManagementGroupManager cmgMgr;
@@ -233,10 +235,6 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
private final ReplicaStateManager replicaStateManager;
- private volatile UUID localNodeId;
-
- private volatile String localNodeConsistentId;
-
private volatile @Nullable HybridTimestamp lastIdleSafeTimeProposal;
private final Function<ReplicationGroupId,
CompletableFuture<VersionedAssignments>> getPendingAssignmentsSupplier;
@@ -285,6 +283,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
Executor throttledLogExecutor
) {
this.clusterNetSvc = clusterNetSvc;
+ this.localNode = clusterNetSvc.staticLocalNode();
this.cmgMgr = cmgMgr;
this.stableAssignmentsSupplier = stableAssignmentsSupplier;
this.clockService = clockService;
@@ -305,6 +304,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
this.replicaLifecycleExecutor = replicaLifecycleExecutor;
this.replicaStateManager = new ReplicaStateManager(
+ localNode.id(),
replicaLifecycleExecutor,
placementDriver,
this,
@@ -314,7 +314,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
// This pool MUST be single-threaded to make sure idle safe time
propagation attempts are not reordered on it.
scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
1,
-
IgniteThreadFactory.create(clusterNetSvc.staticLocalNode().name(),
"scheduled-idle-safe-time-sync-thread", LOG)
+ IgniteThreadFactory.create(localNode.name(),
"scheduled-idle-safe-time-sync-thread", LOG)
);
throttledLog = Loggers.toThrottledLogger(LOG, throttledLogExecutor);
@@ -746,7 +746,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
boolean isVolatileStorage,
Function<TopologyAwareRaftGroupService, Replica> replicaFactory
) throws NodeStoppingException {
- RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
+ RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNode.name()));
RaftGroupOptions groupOptions =
groupOptionsForPartition(isVolatileStorage, snapshotStorageFactory);
@@ -813,7 +813,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
*/
@VisibleForTesting
public void resetPeers(ReplicationGroupId replicaGrpId, PeersAndLearners
peersAndLearners, long sequenceToken) {
- RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
+ RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNode.name()));
Loza loza = (Loza) raftManager;
Status status = loza.resetPeers(raftNodeId, peersAndLearners,
sequenceToken);
@@ -1005,11 +1005,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
});
- localNodeId = clusterNetSvc.staticLocalNode().id();
-
- localNodeConsistentId = clusterNetSvc.staticLocalNode().name();
-
- replicaStateManager.start(localNodeId);
+ replicaStateManager.start();
return nullCompletedFuture();
}
@@ -1070,7 +1066,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
if (replicaIsAbsent && (groupId instanceof ZonePartitionId)) {
replicaInAssignmentsFuture =
stableAssignmentsSupplier.apply((ZonePartitionId) groupId)
- .thenApply(assignments ->
assignments.contains(localNodeConsistentId));
+ .thenApply(assignments ->
assignments.contains(localNode.name()));
} else {
replicaInAssignmentsFuture = trueCompletedFuture();
}
@@ -1082,7 +1078,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
e = new ReplicaUnavailableException(
REPLICA_ABSENT_ERR,
format("Replica is absent on this node and not in
assignments, the request should be retried on another node "
- + "[groupId={}, nodeName={}]",
groupId, localNodeConsistentId
+ + "[groupId={}, nodeName={}]",
groupId, localNode.name()
)
);
} else {
@@ -1208,7 +1204,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
.groupId(toReplicationGroupIdMessage(replicaGroupId))
.build();
- replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
+ replica.processRequest(req, localNode.id()).whenComplete((res, ex) -> {
if (ex != null) {
if (hasCause(ex, TimeoutException.class,
ReplicationTimeoutException.class)) {
tryToLogTimeoutFailure(replicaGroupId, ex);
@@ -1375,7 +1371,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
*/
public void destroyReplicationProtocolStorages(ReplicationGroupId
replicaGrpId, boolean isVolatileStorage)
throws NodeStoppingException {
- RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
+ RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNode.name()));
RaftGroupOptions groupOptions =
groupOptionsForPartition(isVolatileStorage, null);
((Loza) raftManager).destroyRaftNodeStorages(raftNodeId, groupOptions);
@@ -1393,7 +1389,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
*/
public void destroyReplicationProtocolStoragesDurably(ReplicationGroupId
replicaGrpId, boolean isVolatileStorage)
throws NodeStoppingException {
- RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
+ RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNode.name()));
RaftGroupOptions groupOptions =
groupOptionsForPartition(isVolatileStorage, null);
((Loza) raftManager).destroyRaftNodeStoragesDurably(raftNodeId,
groupOptions);
@@ -1406,7 +1402,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @throws NodeStoppingException If the node is being stopped.
*/
public void createMetaStorage(ReplicationGroupId replicaGrpId) throws
NodeStoppingException {
- var raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
+ var raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNode.name()));
((Loza) raftManager).createMetaStorage(raftNodeId);
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
index 066e33319a5..b5163899dbf 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
@@ -61,24 +61,25 @@ class ReplicaStateManager {
private final FailureProcessor failureProcessor;
- private volatile UUID localNodeId;
+ private final UUID localNodeId;
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
ReplicaStateManager(
+ UUID localNodeId,
Executor replicaStartStopExecutor,
PlacementDriver placementDriver,
ReplicaManager replicaManager,
FailureProcessor failureProcessor
) {
+ this.localNodeId = localNodeId;
this.replicaStartStopExecutor = replicaStartStopExecutor;
this.placementDriver = placementDriver;
this.replicaManager = replicaManager;
this.failureProcessor = failureProcessor;
}
- void start(UUID localNodeId) {
- this.localNodeId = localNodeId;
+ void start() {
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
this::onPrimaryElected);
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
this::onPrimaryExpired);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index eb8d0697136..77684332f2c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -787,13 +787,12 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
);
TableManager tableManager = new TableManager(
- name,
+ clusterSvc.staticLocalNode(),
registry,
gcConfig,
replicationConfiguration,
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
- clusterSvc.staticLocalNode(),
lockManager,
replicaService,
txManager,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 47e1cdd5c80..97f573b5986 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -511,8 +511,6 @@ public class IgniteImpl implements Ignite {
private final LogStorageManager cmgLogStorageManager;
- private final RaftGroupOptionsConfigurer partitionRaftConfigurer;
-
private final IndexMetaStorage indexMetaStorage;
private final EventLogImpl eventLog;
@@ -701,7 +699,7 @@ public class IgniteImpl implements Ignite {
RaftGroupOptionsConfigurer msRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(msLogStorageManager,
metastorageWorkDir.metaPath());
- partitionRaftConfigurer =
+ RaftGroupOptionsConfigurer partitionRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageManager,
partitionsWorkDir.metaPath());
GroupStoragesContextResolver groupStoragesContextResolver =
createGroupStoragesContextResolver();
@@ -1166,13 +1164,12 @@ public class IgniteImpl implements Ignite {
partitionModificationCounterFactory = new
PartitionModificationCounterFactory(clockService::current,
clusterSvc.messagingService());
distributedTblMgr = new TableManager(
- name,
+ localNode,
registry,
gcConfig,
replicationConfig,
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
- localNode,
lockMgr,
replicaSvc,
txManager,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index fa5ebb1b38d..d43c5040fac 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -152,6 +152,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(TableManager.class);
+ private final InternalClusterNode localNode;
+
private final TopologyService topologyService;
/** Lock manager. */
@@ -246,8 +248,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final TransactionInflights transactionInflights;
- private final String nodeName;
-
private final PartitionReplicaLifecycleManager
partitionReplicaLifecycleManager;
@Nullable
@@ -276,7 +276,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/**
* Creates a new table manager.
*
- * @param nodeName Node name.
+ * @param localNode Local node.
* @param registry Registry for versioned values.
* @param gcConfig Garbage collector configuration.
* @param replicationConfiguration Replication configuration.
@@ -301,13 +301,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param metricManager Metric manager.
*/
public TableManager(
- String nodeName,
+ InternalClusterNode localNode,
RevisionListenerRegistry registry,
GcConfiguration gcConfig,
ReplicationConfiguration replicationConfiguration,
MessagingService messagingService,
TopologyService topologyService,
- InternalClusterNode localNode,
LockManager lockMgr,
ReplicaService replicaSvc,
TxManager txManager,
@@ -335,6 +334,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
MetricManager metricManager,
PartitionModificationCounterFactory
partitionModificationCounterFactory
) {
+ this.localNode = localNode;
this.topologyService = topologyService;
this.lockMgr = lockMgr;
this.replicaSvc = replicaSvc;
@@ -350,7 +350,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.sql = sql;
this.lowWatermark = lowWatermark;
this.transactionInflights = transactionInflights;
- this.nodeName = nodeName;
this.partitionReplicaLifecycleManager =
partitionReplicaLifecycleManager;
this.metricManager = metricManager;
@@ -366,9 +365,9 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
assignmentsUpdatedVv = new
IncrementalVersionedValue<>("TableManager#assignmentsUpdated",
dependingOn(localPartitionsVv));
scanRequestExecutor = Executors.newSingleThreadExecutor(
- IgniteThreadFactory.create(nodeName, "scan-query-executor",
LOG, STORAGE_READ));
+ IgniteThreadFactory.create(localNode.name(),
"scan-query-executor", LOG, STORAGE_READ));
- MvGc mvGc = new MvGc(nodeName, gcConfig, lowWatermark,
failureProcessor);
+ MvGc mvGc = new MvGc(localNode.name(), gcConfig, lowWatermark,
failureProcessor);
partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
messagingService,
@@ -1163,7 +1162,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
try {
if (streamerFlushExecutor == null) {
streamerFlushExecutor =
Executors.newSingleThreadScheduledExecutor(
- IgniteThreadFactory.create(nodeName,
"streamer-flush-executor", LOG, STORAGE_WRITE));
+ IgniteThreadFactory.create(localNode.name(),
"streamer-flush-executor", LOG, STORAGE_WRITE));
}
return streamerFlushExecutor;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index a2a9845e737..8e3dfaf5b74 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -562,13 +562,12 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
));
tableManager = new TableManager(
- NODE_NAME,
+ node,
revisionUpdater,
gcConfig,
replicationConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
- node,
mock(LockManager.class),
null,
txManager,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index c20e7eb43da..5c98a208f65 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -122,7 +122,6 @@ import
org.apache.ignite.internal.testframework.InjectExecutorService;
import
org.apache.ignite.internal.testframework.failure.FailureManagerExtension;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
@@ -194,9 +193,6 @@ public class TableManagerTest extends IgniteAbstractTest {
@InjectConfiguration
private GcConfiguration gcConfig;
- @InjectConfiguration
- private TransactionConfiguration txConfig;
-
/** Storage update configuration. */
@InjectConfiguration
private ReplicationConfiguration replicationConfiguration;
@@ -572,13 +568,12 @@ public class TableManagerTest extends IgniteAbstractTest {
sm = new SchemaManager(revisionUpdater, catalogManager);
var tableManager = new TableManager(
- NODE_NAME,
+ node,
revisionUpdater,
gcConfig,
replicationConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
- clusterService.staticLocalNode(),
mock(LockManager.class),
mock(ReplicaService.class),
tm,
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 73d50163a21..b8052a6f997 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -195,9 +195,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
/** Messaging service. */
private final MessagingService messagingService;
- /** Local node network identity. This id is available only after the
network has started. */
- private volatile UUID localNodeId;
-
/** Server cleanup processor. */
private final TxCleanupRequestHandler txCleanupRequestHandler;
@@ -422,7 +419,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return txMetrics;
}
- private CompletableFuture<Boolean> primaryReplicaEventListener(
+ private static CompletableFuture<Boolean> primaryReplicaEventListener(
PrimaryReplicaEventParameters eventParameters,
Consumer<ZonePartitionId> action
) {
@@ -436,7 +433,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
private CompletableFuture<Boolean>
primaryReplicaElectedListener(PrimaryReplicaEventParameters eventParameters) {
return primaryReplicaEventListener(eventParameters, groupId -> {
- if (localNodeId.equals(eventParameters.leaseholderId())) {
+ if (localNode.id().equals(eventParameters.leaseholderId())) {
txMessageSender.sendRecoveryCleanup(localNode.name(), groupId);
}
});
@@ -497,7 +494,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
this,
timestampTracker,
txId,
- localNodeId,
+ localNode.id(),
implicit,
timeout,
options.killClosure()
@@ -555,7 +552,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
long timeout = getTimeoutOrDefault(options,
txConfig.readOnlyTimeoutMillis().value());
var transaction = new ReadOnlyTransactionImpl(
- this, timestampTracker, txId, localNodeId, timeout,
readTimestamp, txFuture
+ this, timestampTracker, txId, localNode.id(), timeout,
readTimestamp, txFuture
);
transactionExpirationRegistry.register(transaction);
@@ -666,7 +663,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
if (enlistedGroups.isEmpty()) {
// If there are no enlisted groups, just update local state - we
already marked the tx as finished.
updateTxMeta(txId, old -> builder(old, commitIntent ? COMMITTED :
ABORTED)
- .txCoordinatorId(localNodeId)
+ .txCoordinatorId(localNode.id())
.commitPartitionId(commitPartition)
.commitTimestamp(commitTimestamp(commitIntent))
.finishedDueToTimeout(finishedDueToTimeout(old,
finishReason))
@@ -731,7 +728,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
txContext.isNoWrites() && noRemoteWrites && !recovery
)
).whenComplete((unused, throwable) -> {
- if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
+ if (localNode.id().equals(finishingStateMeta.txCoordinatorId())) {
txMetrics.onReadWriteTransactionFinished(txId, commitIntent &&
throwable == null);
decrementRwTxCount(txId);
@@ -784,7 +781,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
txStateVolatileStorage.updateMeta(txId, old -> null);
TxStateMeta meta =
builder(verifiedCommit ? COMMITTED : ABORTED)
-
.txCoordinatorId(localNodeId)
+
.txCoordinatorId(localNode.id())
.commitTimestamp(commitTimestamp)
.cleanupCompletionTimestamp(System.currentTimeMillis())
.txLabel(previous == null
? null : previous.txLabel())
@@ -921,7 +918,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
validateTxFinishedAsExpected(commit, txId, txResult);
TxStateMeta updatedMeta = updateTxMeta(txId, old ->
builder(old, txResult.transactionState())
- .txCoordinatorId(localNodeId)
+ .txCoordinatorId(localNode.id())
.commitTimestamp(txResult.commitTimestamp())
.build()
);
@@ -1040,8 +1037,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
// TODO https://issues.apache.org/jira/browse/IGNITE-23539
lockManager.start(deadlockPreventionPolicy);
- localNodeId = localNode.id();
-
messagingService.addMessageHandler(ReplicaMessageGroup.class, this);
persistentTxStateVacuumizer = new PersistentTxStateVacuumizer(
@@ -1051,7 +1046,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
placementDriver
);
- txViewProvider.init(localNodeId, txStateVolatileStorage.statesMap());
+ txViewProvider.init(localNode.id(),
txStateVolatileStorage.statesMap());
orphanDetector.start(txStateVolatileStorage,
() -> longProperty(systemCfg, ABANDONED_CHECK_TS_PROP,
ABANDONED_CHECK_TS_PROP_DEFAULT_VALUE));