This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a4ea716e72 IGNITE-22631 Get rid of @Marshallable in
org.apache.ignite.internal.tx.message (#4026)
a4ea716e72 is described below
commit a4ea716e7255bff749ebce11b2ae70eb4aef4216
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Jul 2 18:17:00 2024 +0300
IGNITE-22631 Get rid of @Marshallable in
org.apache.ignite.internal.tx.message (#4026)
---
.../network/PartitionReplicationMessageGroup.java | 4 --
.../network/PartitionReplicationMessageUtils.java | 57 ----------------
.../network/raft/SnapshotTxDataResponse.java | 1 +
.../ignite/internal/table/ItDurableFinishTest.java | 4 +-
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 8 ++-
.../replicator/PartitionReplicaListener.java | 13 +++-
.../replicator/TransactionStateResolver.java | 23 +++++--
.../incoming/IncomingSnapshotCopierTest.java | 8 ++-
.../replication/PartitionReplicaListenerTest.java | 34 ++++++----
.../apache/ignite/internal/tx/TransactionMeta.java | 6 ++
.../java/org/apache/ignite/internal/tx/TxMeta.java | 25 ++++++-
.../org/apache/ignite/internal/tx/TxStateMeta.java | 33 +++++++--
.../ignite/internal/tx/TxStateMetaAbandoned.java | 27 +++++++-
.../ignite/internal/tx/TxStateMetaFinishing.java | 23 +++++++
.../internal/tx/impl/TxCleanupRequestHandler.java | 79 +++++++++++++++-------
.../internal/tx/impl/TxCleanupRequestSender.java | 10 ++-
.../ignite/internal/tx/impl/TxManagerImpl.java | 7 +-
.../ignite/internal/tx/impl/TxMessageSender.java | 57 ++++++++++++----
.../tx/message/CleanupReplicatedInfoMessage.java | 48 +++++++++++++
.../tx/message/TransactionMetaMessage.java} | 31 ++-------
.../internal/tx/message/TxCleanupMessage.java | 9 +--
.../tx/message/TxCleanupMessageResponse.java | 7 +-
.../tx/message/TxFinishReplicaRequest.java | 12 +---
.../ignite/internal/tx/message/TxMessageGroup.java | 14 ++++
.../ignite/internal/tx/message}/TxMetaMessage.java | 39 +++--------
...ponse.java => TxStateMetaAbandonedMessage.java} | 33 ++++++---
...ponse.java => TxStateMetaFinishingMessage.java} | 30 +++++---
.../internal/tx/message/TxStateMetaMessage.java | 59 ++++++++++++++++
.../internal/tx/message/TxStateResponse.java | 7 +-
29 files changed, 460 insertions(+), 248 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 7ee9efef57..05ef537b46 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -38,7 +38,6 @@ import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDa
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
-import
org.apache.ignite.internal.partition.replicator.network.raft.TxMetaMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
@@ -186,9 +185,6 @@ public interface PartitionReplicationMessageGroup {
*/
short TIMED_BINARY_ROW_MESSAGE = 24;
- /** Message type for {@link TxMetaMessage}. */
- short TX_META_MESSAGE = 25;
-
/**
* Message types for partition replicator module RAFT commands.
*
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageUtils.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageUtils.java
deleted file mode 100644
index d4f8c799ba..0000000000
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.partition.replicator.network;
-
-import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
-
-import java.util.ArrayList;
-import
org.apache.ignite.internal.partition.replicator.network.raft.TxMetaMessage;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
-import org.apache.ignite.internal.tx.TxMeta;
-
-/** Class that can contain useful constants and methods for working with
messages from {@link PartitionReplicationMessageGroup}. */
-public class PartitionReplicationMessageUtils {
- /**
- * Converts to a network message.
- *
- * @param partitionReplicationMessagesFactory Partition replication
messages factory.
- * @param replicaMessagesFactory Replica messages factory.
- * @param txMeta Transaction meta.
- * @return New instance of network message.
- */
- public static TxMetaMessage toTxMetaMessage(
- PartitionReplicationMessagesFactory
partitionReplicationMessagesFactory,
- ReplicaMessagesFactory replicaMessagesFactory,
- TxMeta txMeta
- ) {
- var enlistedPartitionMessages = new
ArrayList<TablePartitionIdMessage>(txMeta.enlistedPartitions().size());
-
- for (TablePartitionId enlistedPartition : txMeta.enlistedPartitions())
{
-
enlistedPartitionMessages.add(toTablePartitionIdMessage(replicaMessagesFactory,
enlistedPartition));
- }
-
- return partitionReplicationMessagesFactory.txMetaMessage()
- .txStateInt(txMeta.txState().ordinal())
-
.commitTimestampLong(hybridTimestampToLong(txMeta.commitTimestamp()))
- .enlistedPartitions(enlistedPartitionMessages)
- .build();
- }
-}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/SnapshotTxDataResponse.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/SnapshotTxDataResponse.java
index 96ee5f5a6a..ece9048541 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/SnapshotTxDataResponse.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/SnapshotTxDataResponse.java
@@ -22,6 +22,7 @@ import java.util.UUID;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.tx.message.TxMetaMessage;
/**
* Snapshot TX state partition data response message.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 66c3f45e55..4055bcba2d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -54,7 +54,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxStateMeta;
-import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
import org.apache.ignite.internal.tx.message.TxCleanupMessage;
import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
@@ -350,7 +350,7 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
return false;
}
- CleanupReplicatedInfo result = message.result();
+ CleanupReplicatedInfoMessage result = message.result();
if (result != null) {
cleanupReplicatedFuture.complete(null);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index f7751ddb0b..7244c4eb2c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
-import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageUtils.toTxMetaMessage;
import static
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotMetaUtils.collectNextRowIdToBuildIndexes;
import static
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
@@ -42,7 +41,6 @@ import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDa
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
-import
org.apache.ignite.internal.partition.replicator.network.raft.TxMetaMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -52,6 +50,8 @@ import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMetaMessage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
@@ -70,6 +70,8 @@ public class OutgoingSnapshot {
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
private final UUID id;
private final PartitionAccess partition;
@@ -419,7 +421,7 @@ public class OutgoingSnapshot {
for (IgniteBiTuple<UUID, TxMeta> row : rows) {
txIds.add(row.getKey());
-
txMetas.add(toTxMetaMessage(PARTITION_REPLICATION_MESSAGES_FACTORY,
REPLICA_MESSAGES_FACTORY, row.getValue()));
+
txMetas.add(row.getValue().toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY,
TX_MESSAGES_FACTORY));
}
return PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotTxDataResponse()
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 2b34bff597..8954ff184c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -70,6 +70,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -1624,7 +1625,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
*/
private CompletableFuture<TransactionResult>
processTxFinishAction(TxFinishReplicaRequest request) {
// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use
ZonePartitionIdMessage and remove cast
- Map<TablePartitionId, String> enlistedGroups = (Map<TablePartitionId,
String>) (Map<?, ?>) request.groups();
+ Map<TablePartitionId, String> enlistedGroups =
asTablePartitionIdStringMap(request.groups());
UUID txId = request.txId();
@@ -4169,4 +4170,14 @@ public class PartitionReplicaListener implements
ReplicaListener {
return result;
}
}
+
+ private static Map<TablePartitionId, String>
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+ var result = new HashMap<TablePartitionId, String>(messages.size());
+
+ for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+ result.put(e.getKey().asTablePartitionId(), e.getValue());
+ }
+
+ return result;
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index 86b57b7517..97f82b19d3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.TxStateMetaFinishing;
import org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
@@ -53,7 +55,10 @@ import org.jetbrains.annotations.Nullable;
*/
public class TransactionStateResolver {
/** Tx messages factory. */
- private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
+ /** Replica messages factory. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
// TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this
ticket this resolver will be no longer needed, as
// TODO we will store coordinator as ClusterNode in local tx state map.
@@ -110,8 +115,8 @@ public class TransactionStateResolver {
processTxStateRequest(req)
.thenAccept(txStateMeta -> {
- NetworkMessage response = FACTORY.txStateResponse()
- .txStateMeta(txStateMeta)
+ NetworkMessage response =
TX_MESSAGES_FACTORY.txStateResponse()
+
.txStateMeta(toTransactionMetaMessage(txStateMeta))
.timestampLong(clockService.nowLong())
.build();
@@ -218,7 +223,7 @@ public class TransactionStateResolver {
txMessageSender.resolveTxStateFromCoordinator(coordinator.name(),
txId, timestamp)
.whenComplete((response, e) -> {
if (e == null) {
- txMetaFuture.complete(response.txStateMeta());
+
txMetaFuture.complete(asTransactionMeta(response.txStateMeta()));
} else {
resolveTxStateFromCommitPartition(txId,
commitGrpId, txMetaFuture);
}
@@ -291,7 +296,7 @@ public class TransactionStateResolver {
* @param request Request.
* @return Future that should be completed with transaction state meta.
*/
- private CompletableFuture<TransactionMeta>
processTxStateRequest(TxStateCoordinatorRequest request) {
+ private CompletableFuture<@Nullable TransactionMeta>
processTxStateRequest(TxStateCoordinatorRequest request) {
clockService.updateClock(request.readTimestamp());
UUID txId = request.txId();
@@ -308,4 +313,12 @@ public class TransactionStateResolver {
return completedFuture(txStateMeta);
}
}
+
+ private static @Nullable TransactionMetaMessage
toTransactionMetaMessage(@Nullable TransactionMeta transactionMeta) {
+ return transactionMeta == null ? null :
transactionMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY,
TX_MESSAGES_FACTORY);
+ }
+
+ private static @Nullable TransactionMeta asTransactionMeta(@Nullable
TransactionMetaMessage transactionMetaMessage) {
+ return transactionMetaMessage == null ? null :
transactionMetaMessage.asTransactionMeta();
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 356c7682a4..0a55b88263 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -21,7 +21,6 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageUtils.toTxMetaMessage;
import static
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -77,7 +76,6 @@ import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMeta
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataRequest;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
-import
org.apache.ignite.internal.partition.replicator.network.raft.TxMetaMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
@@ -108,6 +106,8 @@ import
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMetaMessage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
@@ -148,6 +148,8 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
private static final LowWatermarkMessagesFactory LWM_MSG_FACTORY = new
LowWatermarkMessagesFactory();
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
private final ExecutorService executorService =
Executors.newSingleThreadExecutor();
private final ClusterNode clusterNode = mock(ClusterNode.class);
@@ -285,7 +287,7 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
List<TxMetaMessage> txMetas = txIds.stream()
.map(outgoingTxStatePartitionStorage::get)
- .map(txMeta -> toTxMetaMessage(TABLE_MSG_FACTORY,
REPLICA_MESSAGES_FACTORY, txMeta))
+ .map(txMeta ->
txMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY, TX_MESSAGES_FACTORY))
.collect(toList());
return
completedFuture(TABLE_MSG_FACTORY.snapshotTxDataResponse().txIds(txIds).txMeta(txMetas).finish(true).build());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 19202c05fd..8c0947fe1d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -19,7 +19,9 @@ package
org.apache.ignite.internal.table.distributed.replication;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
@@ -152,7 +154,6 @@ import
org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
@@ -218,9 +219,11 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
+import org.apache.ignite.internal.tx.message.TxStateResponse;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
@@ -565,12 +568,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
if (argument instanceof TxStateCoordinatorRequest) {
TxStateCoordinatorRequest req = (TxStateCoordinatorRequest)
argument;
- var resp = new
TxMessagesFactory().txStateResponse().txStateMeta(txManager.stateMeta(req.txId())).build();
-
- return completedFuture(resp);
+ return
completedFuture(toTxStateResponse(txManager.stateMeta(req.txId())));
}
- return CompletableFuture.failedFuture(new Exception("Test
exception"));
+ return failedFuture(new Exception("Test exception"));
}).when(messagingService).invoke(any(ClusterNode.class), any(),
anyLong());
doAnswer(invocation -> {
@@ -579,12 +580,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
if (argument instanceof TxStateCoordinatorRequest) {
TxStateCoordinatorRequest req = (TxStateCoordinatorRequest)
argument;
- var resp = new
TxMessagesFactory().txStateResponse().txStateMeta(txManager.stateMeta(req.txId())).build();
-
- return completedFuture(resp);
+ return
completedFuture(toTxStateResponse(txManager.stateMeta(req.txId())));
}
- return CompletableFuture.failedFuture(new Exception("Test
exception"));
+ return failedFuture(new Exception("Test exception"));
}).when(messagingService).invoke(anyString(), any(), anyLong());
ClusterNodeResolver clusterNodeResolver = new ClusterNodeResolver() {
@@ -1701,7 +1700,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
TxFinishReplicaRequest commitRequest =
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
.txId(txId)
- .groups(Map.of(grpId, localNode.name()))
+ .groups(Map.of(tablePartitionIdMessage(grpId),
localNode.name()))
.commit(false)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
@@ -1765,7 +1764,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
TxFinishReplicaRequest commitRequest =
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
.txId(txId)
- .groups(Map.of(grpId, localNode.name()))
+ .groups(Map.of(tablePartitionIdMessage(grpId),
localNode.name()))
.commit(true)
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -2486,7 +2485,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private void testCommitRequestIfTableWasDropped(
TablePartitionId commitPartitionId,
- Map<ReplicationGroupId, String> groups,
+ Map<TablePartitionId, String> groups,
int tableToBeDroppedId
) {
when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(), any(HybridTimestamp.class)))
@@ -2507,7 +2506,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
CompletableFuture<?> future = partitionReplicaListener.invoke(
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.groupId(tablePartitionIdMessage(commitPartitionId))
- .groups(groups)
+ .groups(groups.entrySet().stream().collect(toMap(e ->
tablePartitionIdMessage(e.getKey()), Map.Entry::getValue)))
.txId(txId)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.commit(true)
@@ -3092,4 +3091,13 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
when(indexMetaStorage.indexMeta(eq(indexId))).thenReturn(indexMeta);
}
+
+ private static @Nullable TxStateResponse toTxStateResponse(@Nullable
TransactionMeta transactionMeta) {
+ TransactionMetaMessage transactionMetaMessage =
+ transactionMeta == null ? null :
transactionMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY,
TX_MESSAGES_FACTORY);
+
+ return TX_MESSAGES_FACTORY.txStateResponse()
+ .txStateMeta(transactionMetaMessage)
+ .build();
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
index 975a818479..6e04440841 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.tx;
import java.io.Serializable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.jetbrains.annotations.Nullable;
/**
@@ -30,4 +33,7 @@ public interface TransactionMeta extends Serializable {
/** Commit timestamp. */
@Nullable HybridTimestamp commitTimestamp();
+
+ /** Converts to network message. */
+ TransactionMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory
replicaMessagesFactory, TxMessagesFactory txMessagesFactory);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
index 6b91e80d29..a8e11643f3 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -18,12 +18,19 @@
package org.apache.ignite.internal.tx;
import static java.util.Collections.unmodifiableCollection;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMetaMessage;
import org.jetbrains.annotations.Nullable;
/** Transaction meta. */
@@ -38,8 +45,7 @@ public class TxMeta implements TransactionMeta {
private final Collection<TablePartitionId> enlistedPartitions;
/** Commit timestamp. */
- @Nullable
- private final HybridTimestamp commitTimestamp;
+ private final @Nullable HybridTimestamp commitTimestamp;
/**
* The constructor.
@@ -68,6 +74,21 @@ public class TxMeta implements TransactionMeta {
return commitTimestamp;
}
+ @Override
+ public TxMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory
replicaMessagesFactory, TxMessagesFactory txMessagesFactory) {
+ var enlistedPartitionMessages = new
ArrayList<TablePartitionIdMessage>(enlistedPartitions.size());
+
+ for (TablePartitionId enlistedPartition : enlistedPartitions) {
+
enlistedPartitionMessages.add(toTablePartitionIdMessage(replicaMessagesFactory,
enlistedPartition));
+ }
+
+ return txMessagesFactory.txMetaMessage()
+ .txStateInt(txState.ordinal())
+ .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+ .enlistedPartitions(enlistedPartitionMessages)
+ .build();
+ }
+
@Override
public String toString() {
return S.toString(TxMeta.class, this);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index f489d4a88c..491378513f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -17,13 +17,18 @@
package org.apache.ignite.internal.tx;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateMetaMessage;
import org.jetbrains.annotations.Nullable;
/**
@@ -34,17 +39,16 @@ public class TxStateMeta implements TransactionMeta {
private final TxState txState;
- private final String txCoordinatorId;
+ private final @Nullable String txCoordinatorId;
- /** Identifier of the replication group that manages a transaction state.
*/
- private final TablePartitionId commitPartitionId;
+ /** ID of the replication group that manages a transaction state. */
+ private final @Nullable TablePartitionId commitPartitionId;
- private final HybridTimestamp commitTimestamp;
+ private final @Nullable HybridTimestamp commitTimestamp;
- private final Long initialVacuumObservationTimestamp;
+ private final @Nullable Long initialVacuumObservationTimestamp;
- @Nullable
- private final Long cleanupCompletionTimestamp;
+ private final @Nullable Long cleanupCompletionTimestamp;
/**
* Constructor.
@@ -154,6 +158,21 @@ public class TxStateMeta implements TransactionMeta {
return cleanupCompletionTimestamp;
}
+ @Override
+ public TxStateMetaMessage toTransactionMetaMessage(
+ ReplicaMessagesFactory replicaMessagesFactory,
+ TxMessagesFactory txMessagesFactory
+ ) {
+ return txMessagesFactory.txStateMetaMessage()
+ .txStateInt(txState.ordinal())
+ .txCoordinatorId(txCoordinatorId)
+ .commitPartitionId(commitPartitionId == null ? null :
toTablePartitionIdMessage(replicaMessagesFactory, commitPartitionId))
+ .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+
.initialVacuumObservationTimestamp(initialVacuumObservationTimestamp)
+ .cleanupCompletionTimestamp(cleanupCompletionTimestamp)
+ .build();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
index 674283240b..0a45671e39 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
@@ -17,10 +17,15 @@
package org.apache.ignite.internal.tx;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateMetaAbandonedMessage;
import org.apache.ignite.internal.util.FastTimestamps;
/**
@@ -36,9 +41,9 @@ public class TxStateMetaAbandoned extends TxStateMeta {
* Constructor.
*
* @param txCoordinatorId Transaction coordinator id.
- * @param commitPartitionId Commit partition replication group id.
+ * @param commitPartitionId Commit partition replication group ID.
*/
- TxStateMetaAbandoned(
+ public TxStateMetaAbandoned(
String txCoordinatorId,
TablePartitionId commitPartitionId
) {
@@ -56,6 +61,24 @@ public class TxStateMetaAbandoned extends TxStateMeta {
return lastAbandonedMarkerTs;
}
+ @Override
+ public TxStateMetaAbandonedMessage toTransactionMetaMessage(
+ ReplicaMessagesFactory replicaMessagesFactory,
+ TxMessagesFactory txMessagesFactory
+ ) {
+ TablePartitionId tablePartitionId = commitPartitionId();
+
+ return txMessagesFactory.txStateMetaAbandonedMessage()
+ .txStateInt(txState().ordinal())
+ .txCoordinatorId(txCoordinatorId())
+ .commitPartitionId(tablePartitionId == null ? null :
toTablePartitionIdMessage(replicaMessagesFactory, tablePartitionId))
+ .commitTimestampLong(hybridTimestampToLong(commitTimestamp()))
+
.initialVacuumObservationTimestamp(initialVacuumObservationTimestamp())
+ .cleanupCompletionTimestamp(cleanupCompletionTimestamp())
+ .lastAbandonedMarkerTs(lastAbandonedMarkerTs)
+ .build();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
index 13659d8070..ae1ae66bad 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
@@ -17,9 +17,15 @@
package org.apache.ignite.internal.tx;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateMetaFinishingMessage;
import org.jetbrains.annotations.Nullable;
/**
@@ -56,6 +62,23 @@ public class TxStateMetaFinishing extends TxStateMeta {
throw new UnsupportedOperationException("Can't get commit timestamp
from FINISHING transaction state meta.");
}
+ @Override
+ public TxStateMetaFinishingMessage toTransactionMetaMessage(
+ ReplicaMessagesFactory replicaMessagesFactory,
+ TxMessagesFactory txMessagesFactory
+ ) {
+ TablePartitionId commitPartitionId = commitPartitionId();
+
+ return txMessagesFactory.txStateMetaFinishingMessage()
+ .txStateInt(txState().ordinal())
+ .txCoordinatorId(txCoordinatorId())
+ .commitPartitionId(commitPartitionId == null ? null :
toTablePartitionIdMessage(replicaMessagesFactory, commitPartitionId))
+ .commitTimestampLong(hybridTimestampToLong(commitTimestamp()))
+
.initialVacuumObservationTimestamp(initialVacuumObservationTimestamp())
+ .cleanupCompletionTimestamp(cleanupCompletionTimestamp())
+ .build();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index 9796727c2c..db4035a7c2 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -18,11 +18,13 @@
package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -33,11 +35,13 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
import org.apache.ignite.internal.tx.message.TxCleanupMessage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -50,7 +54,10 @@ import org.jetbrains.annotations.Nullable;
*/
public class TxCleanupRequestHandler {
/** Tx messages factory. */
- private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
+ /** Replica messages factory. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
/** Messaging service. */
private final MessagingService messagingService;
@@ -112,19 +119,22 @@ public class TxCleanupRequestHandler {
Map<TablePartitionId, CompletableFuture<?>> writeIntentSwitches = new
HashMap<>();
// These cleanups will all be local.
- Collection<ReplicationGroupId> groups = txCleanupMessage.groups();
+ List<TablePartitionIdMessage> groups = txCleanupMessage.groups();
if (groups != null) {
- trackPartitions(txCleanupMessage.txId(), groups, sender);
-
- for (ReplicationGroupId group : groups) {
- writeIntentSwitches.put((TablePartitionId) group,
- writeIntentSwitchProcessor.switchLocalWriteIntents(
- (TablePartitionId) group,
- txCleanupMessage.txId(),
- txCleanupMessage.commit(),
- txCleanupMessage.commitTimestamp()
- ).thenAccept(this::processWriteIntentSwitchResponse));
+ Set<TablePartitionId> groupSet = asTablePartitionIdSet(groups);
+
+ trackPartitions(txCleanupMessage.txId(), groupSet, sender);
+
+ for (TablePartitionId group : groupSet) {
+ CompletableFuture<Void> future =
writeIntentSwitchProcessor.switchLocalWriteIntents(
+ group,
+ txCleanupMessage.txId(),
+ txCleanupMessage.commit(),
+ txCleanupMessage.commitTimestamp()
+ ).thenAccept(this::processWriteIntentSwitchResponse);
+
+ writeIntentSwitches.put(group, future);
}
}
// First trigger the cleanup to properly release the locks if we know
all affected partitions on this node.
@@ -164,22 +174,22 @@ public class TxCleanupRequestHandler {
}
private NetworkMessage prepareResponse() {
- return FACTORY
+ return TX_MESSAGES_FACTORY
.txCleanupMessageResponse()
.timestampLong(clockService.nowLong())
.build();
}
private NetworkMessage prepareResponse(CleanupReplicatedInfo result) {
- return FACTORY
+ return TX_MESSAGES_FACTORY
.txCleanupMessageResponse()
- .result(result)
+ .result(toCleanupReplicatedInfoMessage(result))
.timestampLong(clockService.nowLong())
.build();
}
private NetworkMessage prepareErrorResponse(UUID txId, Throwable th) {
- return FACTORY
+ return TX_MESSAGES_FACTORY
.txCleanupMessageErrorResponse()
.txId(txId)
.throwable(th)
@@ -194,13 +204,8 @@ public class TxCleanupRequestHandler {
* @param groups Replication groups.
* @param sender Cleanup request sender, needed to send cleanup replicated
response.
*/
- private void trackPartitions(UUID txId, Collection<ReplicationGroupId>
groups, ClusterNode sender) {
- Set<TablePartitionId> partitions =
- groups.stream()
- .map(TablePartitionId.class::cast)
- .collect(toSet());
-
- writeIntentsReplicated.put(txId, new CleanupContext(sender,
partitions, partitions));
+ private void trackPartitions(UUID txId, Set<TablePartitionId> groups,
ClusterNode sender) {
+ writeIntentsReplicated.put(txId, new CleanupContext(sender, groups,
groups));
}
/**
@@ -266,4 +271,28 @@ public class TxCleanupRequestHandler {
this.initialPartitions = initialPartitions;
}
}
+
+ private static CleanupReplicatedInfoMessage
toCleanupReplicatedInfoMessage(CleanupReplicatedInfo info) {
+ Collection<TablePartitionId> partitions = info.partitions();
+ List<TablePartitionIdMessage> partitionMessages = new
ArrayList<>(partitions.size());
+
+ for (TablePartitionId partition : partitions) {
+
partitionMessages.add(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
partition));
+ }
+
+ return TX_MESSAGES_FACTORY.cleanupReplicatedInfoMessage()
+ .txId(info.txId())
+ .partitions(partitionMessages)
+ .build();
+ }
+
+ private static Set<TablePartitionId>
asTablePartitionIdSet(List<TablePartitionIdMessage> messages) {
+ var set = new HashSet<TablePartitionId>(messages.size());
+
+ for (int i = 0; i < messages.size(); i++) {
+ set.add(messages.get(i).asTablePartitionId());
+ }
+
+ return set;
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index e617d111f1..6334a5b691 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -32,12 +32,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
@@ -87,11 +87,11 @@ public class TxCleanupRequestSender {
// The cleanup response is sent only in the success case,
hence no error is expected.
assert !(msg instanceof TxCleanupMessageErrorResponse) :
"Cleanup error response is not expected here.";
- CleanupReplicatedInfo result = ((TxCleanupMessageResponse)
msg).result();
+ CleanupReplicatedInfoMessage result =
((TxCleanupMessageResponse) msg).result();
assert result != null : "Result for the cleanup response
cannot be null.";
- onCleanupReplicated(result);
+ onCleanupReplicated(result.asCleanupReplicatedInfo());
}
});
}
@@ -237,9 +237,7 @@ public class TxCleanupRequestSender {
String node,
@Nullable Collection<TablePartitionId> partitions
) {
- Collection<ReplicationGroupId> enlistedPartitions =
(Collection<ReplicationGroupId>) (Collection<?>) partitions;
-
- return txMessageSender.cleanup(node, enlistedPartitions, txId, commit,
commitTimestamp)
+ return txMessageSender.cleanup(node, partitions, txId, commit,
commitTimestamp)
.handle((networkMessage, throwable) -> {
if (throwable != null) {
if
(TransactionFailureHandler.isRecoverable(throwable)) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index ed68f15543..deda198098 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -82,7 +82,6 @@ import
org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
@@ -590,7 +589,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
(unused, throwable) -> {
boolean verifiedCommit = throwable == null &&
commit;
- Map<ReplicationGroupId, String>
replicationGroupIds = enlistedGroups.entrySet().stream()
+ Map<TablePartitionId, String> replicationGroupIds
= enlistedGroups.entrySet().stream()
.collect(Collectors.toMap(
Entry::getKey,
entry ->
entry.getValue().get1().name()
@@ -617,7 +616,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
boolean commit,
- Map<ReplicationGroupId, String> replicationGroupIds,
+ Map<TablePartitionId, String> replicationGroupIds,
UUID txId,
HybridTimestamp commitTimestamp,
CompletableFuture<TransactionMeta> txFinishFuture
@@ -690,7 +689,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
String primaryConsistentId,
Long enlistmentConsistencyToken,
boolean commit,
- Map<ReplicationGroupId, String> replicationGroupIds,
+ Map<TablePartitionId, String> replicationGroupIds,
UUID txId,
HybridTimestamp commitTimestamp,
CompletableFuture<TransactionMeta> txFinishFuture
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 558cbd3b0d..355958853b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.tx.impl;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -29,10 +32,10 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -45,7 +48,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class TxMessageSender {
/** Tx messages factory. */
- private static final TxMessagesFactory TX_MESSAGE_FACTORY = new
TxMessagesFactory();
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
/** Replica messages factory. */
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
@@ -99,7 +102,7 @@ public class TxMessageSender {
) {
return replicaService.invoke(
primaryConsistentId,
- TX_MESSAGE_FACTORY.writeIntentSwitchReplicaRequest()
+ TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId))
.timestampLong(clockService.nowLong())
.txId(txId)
@@ -113,7 +116,7 @@ public class TxMessageSender {
* Sends cleanup request to the specified primary replica.
*
* @param primaryConsistentId Primary replica to process given cleanup
request.
- * @param replicationGroupIds Table partition ids.
+ * @param replicationGroupIds Table partition IDs.
* @param txId Transaction id.
* @param commit {@code True} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -121,19 +124,19 @@ public class TxMessageSender {
*/
public CompletableFuture<NetworkMessage> cleanup(
String primaryConsistentId,
- @Nullable Collection<ReplicationGroupId> replicationGroupIds,
+ @Nullable Collection<TablePartitionId> replicationGroupIds,
UUID txId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp
) {
return messagingService.invoke(
primaryConsistentId,
- TX_MESSAGE_FACTORY.txCleanupMessage()
+ TX_MESSAGES_FACTORY.txCleanupMessage()
.txId(txId)
.commit(commit)
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
.timestampLong(clockService.nowLong())
- .groups(replicationGroupIds)
+
.groups(toTablePartitionIdMessages(replicationGroupIds))
.build(),
transactionConfiguration.rpcTimeout().value());
}
@@ -153,7 +156,7 @@ public class TxMessageSender {
public CompletableFuture<TransactionResult> finish(
String primaryConsistentId,
TablePartitionId commitPartition,
- Map<ReplicationGroupId, String> replicationGroupIds,
+ Map<TablePartitionId, String> replicationGroupIds,
UUID txId,
Long consistencyToken,
boolean commit,
@@ -161,11 +164,11 @@ public class TxMessageSender {
) {
return replicaService.invoke(
primaryConsistentId,
- TX_MESSAGE_FACTORY.txFinishReplicaRequest()
+ TX_MESSAGES_FACTORY.txFinishReplicaRequest()
.txId(txId)
.timestampLong(clockService.nowLong())
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartition))
- .groups(replicationGroupIds)
+
.groups(toTablePartitionIdMessages(replicationGroupIds))
.commit(commit)
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
.enlistmentConsistencyToken(consistencyToken)
@@ -189,7 +192,7 @@ public class TxMessageSender {
) {
return replicaService.invoke(
primaryConsistentId,
- TX_MESSAGE_FACTORY.txStateCommitPartitionRequest()
+ TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitGrpId))
.txId(txId)
.enlistmentConsistencyToken(consistencyToken)
@@ -211,7 +214,7 @@ public class TxMessageSender {
) {
return messagingService.invoke(
primaryConsistentId,
- TX_MESSAGE_FACTORY.txStateCoordinatorRequest()
+ TX_MESSAGES_FACTORY.txStateCoordinatorRequest()
.readTimestampLong(timestamp.longValue())
.txId(txId)
.build(),
@@ -233,7 +236,7 @@ public class TxMessageSender {
public CompletableFuture<ReplicaResponse> sendRecoveryCleanup(String
primaryConsistentId, TablePartitionId tablePartitionId) {
return replicaService.invoke(
primaryConsistentId,
- TX_MESSAGE_FACTORY.txCleanupRecoveryRequest()
+ TX_MESSAGES_FACTORY.txCleanupRecoveryRequest()
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId))
.build()
);
@@ -242,4 +245,32 @@ public class TxMessageSender {
public MessagingService messagingService() {
return messagingService;
}
+
+ private static @Nullable List<TablePartitionIdMessage>
toTablePartitionIdMessages(
+ @Nullable Collection<TablePartitionId> tablePartitionIds
+ ) {
+ if (tablePartitionIds == null) {
+ return null;
+ }
+
+ var messages = new
ArrayList<TablePartitionIdMessage>(tablePartitionIds.size());
+
+ for (TablePartitionId tablePartitionId : tablePartitionIds) {
+ messages.add(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
tablePartitionId));
+ }
+
+ return messages;
+ }
+
+ private static Map<TablePartitionIdMessage, String>
toTablePartitionIdMessages(
+ Map<TablePartitionId, String> replicationGroupIds
+ ) {
+ var messages = new HashMap<TablePartitionIdMessage,
String>(replicationGroupIds.size());
+
+ for (Map.Entry<TablePartitionId, String> e :
replicationGroupIds.entrySet()) {
+ messages.put(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
e.getKey()), e.getValue());
+ }
+
+ return messages;
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java
new file mode 100644
index 0000000000..d3984f2a8c
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+
+/** Message for transferring a {@link CleanupReplicatedInfo}. */
+@Transferable(TxMessageGroup.CLEANUP_REPLICATED_INFO_MESSAGE)
+public interface CleanupReplicatedInfoMessage extends NetworkMessage {
+ /** Transaction ID. */
+ UUID txId();
+
+ /** Partitions. */
+ List<TablePartitionIdMessage> partitions();
+
+ /** Converts to {@link CleanupReplicatedInfo}. */
+ default CleanupReplicatedInfo asCleanupReplicatedInfo() {
+ List<TablePartitionIdMessage> partitionMessages = partitions();
+ List<TablePartitionId> partitions = new
ArrayList<>(partitionMessages.size());
+
+ for (int i = 0; i < partitionMessages.size(); i++) {
+ partitions.add(partitionMessages.get(i).asTablePartitionId());
+ }
+
+ return new CleanupReplicatedInfo(txId(), partitions);
+ }
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java
similarity index 57%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java
index 4dedc35220..934f595149 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java
@@ -15,34 +15,24 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.partition.replicator.network.raft;
+package org.apache.ignite.internal.tx.message;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
-import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxState;
import org.jetbrains.annotations.Nullable;
-/** Message for transferring a {@link TxMeta}. */
-@Transferable(PartitionReplicationMessageGroup.TX_META_MESSAGE)
-public interface TxMetaMessage extends NetworkMessage {
+/** Message for transferring a {@link TransactionMeta}. */
+public interface TransactionMetaMessage extends NetworkMessage {
/** Ordinal of {@link TxState} value. */
int txStateInt();
/** Commit timestamp in primitive representation, {@link
HybridTimestamp#NULL_HYBRID_TIMESTAMP} as {@code null}. */
long commitTimestampLong();
- /** List of enlisted partition groups. */
- List<TablePartitionIdMessage> enlistedPartitions();
-
/** Transaction state. */
default TxState txState() {
TxState state = TxState.fromOrdinal(txStateInt());
@@ -57,15 +47,8 @@ public interface TxMetaMessage extends NetworkMessage {
return nullableHybridTimestamp(commitTimestampLong());
}
- /** Converts to {@link TxMeta}. */
- default TxMeta asTxMeta() {
- List<TablePartitionIdMessage> enlistedPartitionMessages =
enlistedPartitions();
- var enlistedPartitions = new
ArrayList<TablePartitionId>(enlistedPartitionMessages.size());
-
- for (int i = 0; i < enlistedPartitionMessages.size(); i++) {
-
enlistedPartitions.add(enlistedPartitionMessages.get(i).asTablePartitionId());
- }
-
- return new TxMeta(txState(), enlistedPartitions, commitTimestamp());
+ /** Converts to {@link TransactionMeta}. */
+ default TransactionMeta asTransactionMeta() {
+ throw new AssertionError("Must be implemented by heirs.");
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
index 605fa2066d..138d9be809 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
@@ -19,12 +19,11 @@ package org.apache.ignite.internal.tx.message;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
-import java.util.Collection;
+import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.jetbrains.annotations.Nullable;
@@ -46,9 +45,7 @@ public interface TxCleanupMessage extends TimestampAware {
*
* @return Replication groups aggregated by expected primary replica nodes.
*/
- @Marshallable
- @Nullable
- Collection<ReplicationGroupId> groups();
+ @Nullable List<TablePartitionIdMessage> groups();
/**
* Returns {@code True} if a commit request.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
index 39cbd7aae9..ef872a50b5 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.tx.message;
-import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.jetbrains.annotations.Nullable;
@@ -27,8 +26,6 @@ import org.jetbrains.annotations.Nullable;
*/
@Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
public interface TxCleanupMessageResponse extends TimestampAware {
-
- @Nullable
- @Marshallable
- CleanupReplicatedInfo result();
+ /** Result of a replicated cleanup request. */
+ @Nullable CleanupReplicatedInfoMessage result();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
index a663f1e8a5..abecb5159f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
@@ -22,10 +22,9 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimes
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.jetbrains.annotations.Nullable;
@@ -66,11 +65,6 @@ public interface TxFinishReplicaRequest extends
PrimaryReplicaRequest, Timestamp
return nullableHybridTimestamp(commitTimestampLong());
}
- /**
- * Returns enlisted partition groups aggregated by expected primary
replica nodes.
- *
- * @return Enlisted partition groups aggregated by expected primary
replica nodes.
- */
- @Marshallable
- Map<ReplicationGroupId, String> groups();
+ /** Enlisted partition groups aggregated by expected primary replica
nodes. */
+ Map<TablePartitionIdMessage, String> groups();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index 68fd6ccfdc..338702389a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -94,4 +94,18 @@ public class TxMessageGroup {
*/
public static final short VACUUM_TX_STATE_COMMAND = 13;
+ /** Message type for {@link TxMetaMessage}. */
+ public static final short TX_META_MESSAGE = 14;
+
+ /** Message type for {@link TxStateMetaMessage}. */
+ public static final short TX_STATE_META_MESSAGE = 15;
+
+ /** Message type for {@link TxStateMetaAbandonedMessage}. */
+ public static final short TX_STATE_META_ABANDONED_MESSAGE = 16;
+
+ /** Message type for {@link TxStateMetaFinishingMessage}. */
+ public static final short TX_STATE_META_FINISHING_MESSAGE = 17;
+
+ /** Message type for {@link CleanupReplicatedInfoMessage}. */
+ public static final short CLEANUP_REPLICATED_INFO_MESSAGE = 18;
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java
similarity index 60%
rename from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
rename to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java
index 4dedc35220..fc9f3a333b 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java
@@ -15,48 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.partition.replicator.network.raft;
-
-import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
+package org.apache.ignite.internal.tx.message;
import java.util.ArrayList;
import java.util.List;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
-import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.tx.TxState;
-import org.jetbrains.annotations.Nullable;
/** Message for transferring a {@link TxMeta}. */
-@Transferable(PartitionReplicationMessageGroup.TX_META_MESSAGE)
-public interface TxMetaMessage extends NetworkMessage {
- /** Ordinal of {@link TxState} value. */
- int txStateInt();
-
- /** Commit timestamp in primitive representation, {@link
HybridTimestamp#NULL_HYBRID_TIMESTAMP} as {@code null}. */
- long commitTimestampLong();
-
+@Transferable(TxMessageGroup.TX_META_MESSAGE)
+public interface TxMetaMessage extends TransactionMetaMessage {
/** List of enlisted partition groups. */
List<TablePartitionIdMessage> enlistedPartitions();
- /** Transaction state. */
- default TxState txState() {
- TxState state = TxState.fromOrdinal(txStateInt());
-
- assert state != null : txStateInt();
-
- return state;
- }
-
- /** Commit timestamp. */
- default @Nullable HybridTimestamp commitTimestamp() {
- return nullableHybridTimestamp(commitTimestampLong());
- }
-
/** Converts to {@link TxMeta}. */
default TxMeta asTxMeta() {
List<TablePartitionIdMessage> enlistedPartitionMessages =
enlistedPartitions();
@@ -68,4 +42,9 @@ public interface TxMetaMessage extends NetworkMessage {
return new TxMeta(txState(), enlistedPartitions, commitTimestamp());
}
+
+ @Override
+ default TransactionMeta asTransactionMeta() {
+ return asTxMeta();
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaAbandonedMessage.java
similarity index 51%
copy from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaAbandonedMessage.java
index 5d5ce4bb2a..432ffbd98e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaAbandonedMessage.java
@@ -17,18 +17,29 @@
package org.apache.ignite.internal.tx.message;
-import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.tx.TransactionMeta;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.tx.TxStateMetaAbandoned;
-/**
- * Transaction state response.
- */
-@Transferable(TxMessageGroup.TX_STATE_RESPONSE)
-public interface TxStateResponse extends TimestampAware {
- @Marshallable
- @Nullable
- TransactionMeta txStateMeta();
+/** Message for transferring a {@link TxStateMetaAbandoned}. */
+@Transferable(TxMessageGroup.TX_STATE_META_ABANDONED_MESSAGE)
+public interface TxStateMetaAbandonedMessage extends TxStateMetaMessage {
+ /** Timestamp when the latest {@code ABANDONED} state set. */
+ long lastAbandonedMarkerTs();
+
+ /** Converts to {@link TxStateMetaAbandoned}. */
+ default TxStateMetaAbandoned asTxStateMetaAbandoned() {
+ TablePartitionIdMessage commitPartitionId = commitPartitionId();
+
+ return new TxStateMetaAbandoned(
+ txCoordinatorId(),
+ commitPartitionId == null ? null :
commitPartitionId.asTablePartitionId()
+ );
+ }
+
+ @Override
+ default TransactionMeta asTransactionMeta() {
+ return asTxStateMetaAbandoned();
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaFinishingMessage.java
similarity index 54%
copy from
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaFinishingMessage.java
index 5d5ce4bb2a..16ba5c6ea9 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaFinishingMessage.java
@@ -17,18 +17,26 @@
package org.apache.ignite.internal.tx.message;
-import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.tx.TransactionMeta;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.tx.TxStateMetaFinishing;
-/**
- * Transaction state response.
- */
-@Transferable(TxMessageGroup.TX_STATE_RESPONSE)
-public interface TxStateResponse extends TimestampAware {
- @Marshallable
- @Nullable
- TransactionMeta txStateMeta();
+/** Message for transferring a {@link TxStateMetaFinishing}. */
+@Transferable(TxMessageGroup.TX_STATE_META_FINISHING_MESSAGE)
+public interface TxStateMetaFinishingMessage extends TxStateMetaMessage {
+ /** Converts to {@link TxStateMetaFinishing}. */
+ default TxStateMetaFinishing asTxStateMetaFinishing() {
+ TablePartitionIdMessage commitPartitionId = commitPartitionId();
+
+ return new TxStateMetaFinishing(
+ txCoordinatorId(),
+ commitPartitionId == null ? null :
commitPartitionId.asTablePartitionId()
+ );
+ }
+
+ @Override
+ default TransactionMeta asTransactionMeta() {
+ return asTxStateMetaFinishing();
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java
new file mode 100644
index 0000000000..48285fb635
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/** Message for transferring a {@link TxStateMeta}. */
+@Transferable(TxMessageGroup.TX_STATE_META_MESSAGE)
+public interface TxStateMetaMessage extends TransactionMetaMessage {
+ /** Transaction coordinator ID. */
+ @Nullable String txCoordinatorId();
+
+ /** ID of the replication group that manages a transaction state. */
+ @Nullable TablePartitionIdMessage commitPartitionId();
+
+ /** Initial vacuum observation timestamp. */
+ @Nullable Long initialVacuumObservationTimestamp();
+
+ /** Cleanup completion timestamp. */
+ @Nullable Long cleanupCompletionTimestamp();
+
+ /** Converts to {@link TxStateMeta}. */
+ default TxStateMeta asTxStateMeta() {
+ TablePartitionIdMessage commitPartitionId = commitPartitionId();
+
+ return new TxStateMeta(
+ txState(),
+ txCoordinatorId(),
+ commitPartitionId == null ? null :
commitPartitionId.asTablePartitionId(),
+ commitTimestamp(),
+ initialVacuumObservationTimestamp(),
+ cleanupCompletionTimestamp()
+ );
+ }
+
+ @Override
+ default TransactionMeta asTransactionMeta() {
+ return asTxStateMeta();
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
index 5d5ce4bb2a..20c231e0de 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.tx.message;
-import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.replicator.message.TimestampAware;
-import org.apache.ignite.internal.tx.TransactionMeta;
import org.jetbrains.annotations.Nullable;
/**
@@ -28,7 +26,6 @@ import org.jetbrains.annotations.Nullable;
*/
@Transferable(TxMessageGroup.TX_STATE_RESPONSE)
public interface TxStateResponse extends TimestampAware {
- @Marshallable
- @Nullable
- TransactionMeta txStateMeta();
+ /** Transaction metadata. */
+ @Nullable TransactionMetaMessage txStateMeta();
}