This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bbd4c4e01f2 IGNITE-21910 Implement write intent resolution primary
replica path (primary replica side) (#7408)
bbd4c4e01f2 is described below
commit bbd4c4e01f2791cbbda7ed6adf2c4b44cf44ebc9
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Jan 19 21:59:35 2026 +0400
IGNITE-21910 Implement write intent resolution primary replica path
(primary replica side) (#7408)
---
.../ignite/internal/hlc/HybridTimestamp.java | 2 +-
.../apache/ignite/internal/util/IgniteUtils.java | 24 ++
...xStateCommitPartitionReplicaRequestHandler.java | 1 +
.../distributed/raft/TablePartitionProcessor.java | 13 +-
.../replicator/PartitionReplicaListener.java | 200 +++++++++++++++-
.../replication/PartitionReplicaListenerTest.java | 251 +++++++++++++++++++++
...tdatedReadOnlyTransactionInternalException.java | 53 +++++
.../org/apache/ignite/internal/tx/TxState.java | 24 +-
.../org/apache/ignite/internal/tx/TxStateMeta.java | 4 +
.../ignite/internal/tx/TxStateMetaUnknown.java | 49 ++++
.../ignite/internal/tx/impl/TxManagerImpl.java | 8 +-
.../ignite/internal/tx/message/RowIdMessage.java | 39 ++++
.../ignite/internal/tx/message/TxMessageGroup.java | 13 ++
.../tx/message/TxStateMetaUnknownMessage.java | 39 ++++
.../tx/message/TxStatePrimaryReplicaRequest.java | 39 ++++
.../org/apache/ignite/internal/tx/TxStateTest.java | 41 +++-
16 files changed, 771 insertions(+), 29 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index f46c3348911..988f69f72c8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -209,7 +209,7 @@ public final class HybridTimestamp implements
Comparable<HybridTimestamp>, Seria
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 7b68fdb01fa..db898d50d11 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
@@ -1319,6 +1320,29 @@ public class IgniteUtils {
}
}
+ /**
+ * Makes array from the given arguments, if any argument is an array
itself, its elements are added into result instead of it.
+ *
+ * @param arguments Arguments.
+ * @return Flat array.
+ */
+ public static Object[] flatArray(Object... arguments) {
+ List<Object> list = new ArrayList<>();
+
+ for (Object arg : arguments) {
+ if (arg != null && arg.getClass().isArray()) {
+ int length = Array.getLength(arg);
+ for (int i = 0; i < length; i++) {
+ list.add(Array.get(arg, i));
+ }
+ } else {
+ list.add(arg);
+ }
+ }
+
+ return list.toArray();
+ }
+
private static CompletableFuture<Void> startAsync(ComponentContext
componentContext, Stream<? extends IgniteComponent> components) {
return allOf(components
.filter(Objects::nonNull)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
index de23a2cc628..f64c86af78e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
@@ -191,6 +191,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
}
} else {
// Recovery is not needed.
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27494 Add
UNKNOWN state handling.
assert isFinalState(txMeta.txState()) : "Unexpected transaction
state: " + txMeta;
return completedFuture(txMeta);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index 4b46f0dd9f7..a8e520e5a8e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -27,7 +27,6 @@ import static
org.apache.ignite.internal.partition.replicator.raft.CommandResult
import static
org.apache.ignite.internal.partition.replicator.raft.CommandResult.EMPTY_NOT_APPLIED_RESULT;
import static
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTs;
import static
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTsOrNull;
-import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import java.util.HashSet;
@@ -531,11 +530,13 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
}
private void replicaTouch(UUID txId, UUID txCoordinatorId, HybridTimestamp
commitTimestamp, boolean full) {
- txManager.updateTxMeta(txId, old -> TxStateMeta.builder(old, full ?
COMMITTED : PENDING)
- .txCoordinatorId(txCoordinatorId)
- .commitTimestamp(full ? commitTimestamp : null)
- .build()
- );
+ // Saving state is not needed for full transactions.
+ if (!full) {
+ txManager.updateTxMeta(txId, old -> TxStateMeta.builder(old,
PENDING)
+ .txCoordinatorId(txCoordinatorId)
+ .build()
+ );
+ }
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index dfce5104dae..9419bbc2434 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.replicator;
+import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
@@ -33,11 +34,14 @@ import static
org.apache.ignite.internal.partition.replicator.network.replicatio
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static
org.apache.ignite.internal.table.distributed.replicator.RemoteResourceIds.cursorId;
import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.UNKNOWN;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.tx.TxStateMeta.builder;
+import static
org.apache.ignite.internal.tx.TxStateMetaUnknown.txStateMetaUnknown;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
@@ -54,7 +58,6 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.apache.ignite.lang.ErrorGroups.Replicator.CURSOR_CLOSE_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
-import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -182,6 +185,7 @@ import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
+import
org.apache.ignite.internal.tx.OutdatedReadOnlyTransactionInternalException;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
@@ -191,6 +195,7 @@ import
org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import
org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
import
org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
@@ -612,6 +617,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest)
request, opStartTsIfDirectRo);
} else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
return
processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest)
request, opStartTsIfDirectRo);
+ } else if (request instanceof TxStatePrimaryReplicaRequest) {
+ return
processTxStatePrimaryReplicaRequest((TxStatePrimaryReplicaRequest) request);
}
// Unknown request.
@@ -1793,9 +1800,8 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
HybridTimestamp lwm = lowWatermark.getLowWatermark();
if (lwm != null && opStartTimestamp.compareTo(lwm) < 0) {
- throw new IgniteInternalException(
- TX_READ_ONLY_TOO_OLD_ERR,
- "Attempted to read data below the garbage collection
watermark: [readTimestamp={}, gcTimestamp={}]",
+ throw new OutdatedReadOnlyTransactionInternalException(
+ "Attempted to read data below the garbage collection
watermark",
opStartTimestamp,
lowWatermark.getLowWatermark());
}
@@ -2223,6 +2229,188 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
}
}
+ private CompletableFuture<TransactionMeta>
processTxStatePrimaryReplicaRequest(TxStatePrimaryReplicaRequest request) {
+ return completedFuture(txMetaFromStorage(
+ request.rowId().asRowId(),
+ request.txId(),
+ request.newestCommitTimestamp(),
+ request.readTimestamp()
+ ));
+ }
+
+ /**
+ * On primary replica, we can determine transaction state by checking the
storage state having row id, transaction id,
+ * newest commit timestamp known to requesting replica and read timestamp.
+ * Non-primary replica must wait for safe time that is equal to or greater
than the read timestamp it provides in the request.
+ * Given this, we can see in the storage doing read with the given read
timestamp, and the return result:
+ *
+ * <ul>
+ * <li>Short path (using {@link MvPartitionStorage#read(RowId,
HybridTimestamp)}):
+ * <ul>
+ * <li>there is write intent having transaction id of the sought
transaction: respond with {@code PENDING};</li>
+ * <li>
+ * there is the latest committed version and its timestamp (is
greater than the newest commit timestamp from request or
+ * newest commit timestamp is null), and less than or equal to the
read timestamp. This means, the write intent for
+ * this committed version was created before read timestamp, and
requesting replica has seen it, so it is
+ * committed write intent for sought transaction: respond with
{@code COMMITTED}.
+ * </li>
+ * </ul>
+ * </li>
+ * <li>Otherwise we need to scan the storage. Possible outcomes:
+ * <ul>
+ * <li>
+ * there is committed version and its timestamp (is greater than
the newest commit timestamp
+ * or newest commit timestamp is null) and less than or equal to
the read timestamp. This means, the write intent for
+ * this committed version was created before read timestamp, and
requesting replica has seen it, so it is
+ * committed write intent for sought transaction: respond with
{@code COMMITTED};
+ * </li>
+ * <li>
+ * if read timestamp is not {@link
ClockService#after(HybridTimestamp, HybridTimestamp)} low watermark or
+ * newest commit timestamp is not {@link
ClockService#after(HybridTimestamp, HybridTimestamp)} low watermark then throw
+ * {@link OutdatedReadOnlyTransactionInternalException};
+ * </li>
+ * <li>
+ * there is only committed version(s) with timestamp greater than
the newest commit timestamp and greater than
+ * read timestamp. This means that state of the sought transaction
is unrecoverable from storage state.
+ * However, the write intent should be ignored by the requesting
replica. Respond with {@code UNKNOWN};
+ * </li>
+ * <li>otherwise, if there is any committed version before read
timestamp: respond with {@code ABORTED}.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * @param rowId Row id.
+ * @param txId Transaction id.
+ * @param newestCommitTimestamp Newest commit timestamp known to sender
node.
+ * @param readTimestamp Read timestamp.
+ * @return Transaction meta calculated from the storage state.
+ */
+ private TransactionMeta txMetaFromStorage(
+ RowId rowId,
+ UUID txId,
+ @Nullable HybridTimestamp newestCommitTimestamp,
+ HybridTimestamp readTimestamp
+ ) {
+ ReadResult readResult = mvDataStorage.read(rowId, readTimestamp);
+
+ requireNonNull(readResult, "Read result must not be null.");
+
+ if (isWriteIntentBelongingToThisTx(readResult, txId)) {
+ return builder(PENDING).build();
+ } else if (isNewCommittedVersionBeforeReadTimestamp(readResult,
newestCommitTimestamp, readTimestamp)) {
+ return
builder(COMMITTED).commitTimestamp(readResult.commitTimestamp()).build();
+ } else {
+ // In this case we can't be sure if the write intent of sought
transaction is committed or aborted, scan the storage.
+ // Consider the example:
+ // - wi is created at t1
+ // - read ts is t2
+ // - tx is finished at t3, it may be committed with commit ts t3,
but wi is switched at t4
+ // - now is t5
+ // - backup replica waits for safe time >= t2, here on primary we
read from storage using t2 and can't see version
+ // modification at t4, because there are two operations: addWrite
(at t2) and commitWrite (at t4),
+ // so we need to scan the storage to see all versions up to now.
Commit timestamp is checked on the requesting replica.
+ return txMetaFromStorageWithScan(rowId, txId,
newestCommitTimestamp, readTimestamp);
+ }
+ }
+
+ private static boolean isWriteIntentBelongingToThisTx(ReadResult rr, UUID
txId) {
+ return rr.isWriteIntent() && rr.transactionId() != null &&
rr.transactionId().equals(txId);
+ }
+
+ private static boolean isCommittedVersion(ReadResult rr) {
+ return !rr.isEmpty() && !rr.isWriteIntent();
+ }
+
+ private static boolean isNewCommittedVersionBeforeReadTimestamp(
+ ReadResult rr,
+ @Nullable HybridTimestamp newestCommitTimestamp,
+ HybridTimestamp readTimestamp
+ ) {
+ return isCommittedVersion(rr)
+ && (newestCommitTimestamp == null ||
rr.commitTimestamp().longValue() > newestCommitTimestamp.longValue())
+ && rr.commitTimestamp().longValue() <=
readTimestamp.longValue();
+ }
+
+ private boolean isReadTimestampOutdated(HybridTimestamp readTimestamp,
@Nullable HybridTimestamp newestCommitTimestamp) {
+ HybridTimestamp lwm = lowWatermark.getLowWatermark();
+
+ HybridTimestamp earliestDataAvailableTimestamp = lwm == null ?
HybridTimestamp.MIN_VALUE : lwm;
+
+ return !clockService.after(readTimestamp,
earliestDataAvailableTimestamp)
+ || (newestCommitTimestamp != null &&
!clockService.after(newestCommitTimestamp, earliestDataAvailableTimestamp));
+ }
+
+ private TransactionMeta txMetaFromStorageWithScan(
+ RowId rowId,
+ UUID txId,
+ @Nullable HybridTimestamp newestCommitTimestamp,
+ HybridTimestamp readTimestamp
+ ) {
+ ReadResult anyCommittedAfterReadTs = null;
+ ReadResult wi = null;
+
+ try (Cursor<ReadResult> versions = mvDataStorage.scanVersions(rowId)) {
+ for (ReadResult rr : versions) {
+ if (rr != null && !rr.isEmpty()) {
+ if (rr.isWriteIntent()) {
+ wi = rr;
+ } else {
+ requireNonNull(rr.commitTimestamp(), "Committed read
result must have commit timestamp");
+
+ long commitTs = rr.commitTimestamp().longValue();
+
+ // newestCommitTimestamp comes from requesting replica
and is always less than read timestamp (if not null).
+ if (newestCommitTimestamp == null || commitTs >
newestCommitTimestamp.longValue()) {
+ if (commitTs <= readTimestamp.longValue()) {
+ // This means, the write intent for this
committed version was created before read timestamp,
+ // and requesting replica has seen it, so this
is committed write intent for sought transaction.
+ // Commit timestamp is checked on the
requesting replica.
+ return
builder(COMMITTED).commitTimestamp(rr.commitTimestamp()).build();
+ } else {
+ anyCommittedAfterReadTs = rr;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (wi != null && isWriteIntentBelongingToThisTx(wi, txId)) {
+ return builder(PENDING).build();
+ } else if (isReadTimestampOutdated(readTimestamp,
newestCommitTimestamp)) {
+ // In this case we can't be sure about anything that happened
before read timestamp.
+ throw new
OutdatedReadOnlyTransactionInternalException(readTimestamp,
lowWatermark.getLowWatermark());
+ } else if (anyCommittedAfterReadTs != null) {
+ // This means that state of the sought transaction is
unrecoverable from storage state because committed versions don't
+ // contain tx id.
+ // Consider the examples:
+ // Example 1:
+ // - wi created at t1 by tx1
+ // - read ts is t2, backup replica waits for t2
+ // - tx1 is committed at t5, commit ts is t5
+ // - commit write is replicated at t6
+ // - now is t7
+ // - on primary replica, we see committed version with commit ts t5
+ // Example 2:
+ // - wi1 created at t1 by tx1
+ // - read ts is t2, backup replica waits for t2
+ // - tx1 is aborted at t3
+ // - tx2 creates wi2 on primary at t4
+ // - tx2 is committed at t5, commit ts is t5
+ // - commit write is replicated at t6
+ // - now is t7
+ // - on primary replica, we see committed version with commit ts t5
+ // In 1st example tx1 was committed and in 2nd it was aborted but
in both examples the state of the storage is the same.
+ // So the state of tx1 is unknown.
+ // However, the write intent should be ignored by the requesting
replica because nothing was committed
+ // with commit timestamp less than read timestamp.
+ return txStateMetaUnknown();
+ } else {
+ // These can be only versions known to requesting replica.
+ return builder(ABORTED).build();
+ }
+ }
+
private static <T> boolean allElementsAreNull(List<T> list) {
for (T element : list) {
if (element != null) {
@@ -3313,7 +3501,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
* read time).
*/
private static Boolean canReadFromWriteIntent(UUID txId, TransactionMeta
txMeta, @Nullable HybridTimestamp timestamp) {
- assert isFinalState(txMeta.txState()) || txMeta.txState() == PENDING
+ assert isFinalState(txMeta.txState()) || txMeta.txState() == PENDING
|| txMeta.txState() == UNKNOWN
: format("Unexpected state defined by write intent resolution
[txId={}, txMeta={}].", txId, txMeta);
if (txMeta.txState() == COMMITTED) {
@@ -3321,7 +3509,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return readLatest || txMeta.commitTimestamp().compareTo(timestamp)
<= 0;
} else {
- // Either ABORTED or PENDING.
+ // Either ABORTED, PENDING or UNKNOWN. When UNKNOWN, write intent
is never readable, see state description.
return false;
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 975e8934e39..dee8e3fee0e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -41,6 +41,7 @@ import static
org.apache.ignite.internal.partition.replicator.network.replicatio
import static
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT_ALL;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -50,10 +51,12 @@ import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.UNKNOWN;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
import static
org.apache.ignite.internal.tx.test.TxStateMetaTestUtils.assertTxStateMetaIsSame;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.flatArray;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
@@ -128,6 +131,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -220,6 +224,7 @@ import
org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.LockManager;
+import
org.apache.ignite.internal.tx.OutdatedReadOnlyTransactionInternalException;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.TxManager;
@@ -236,6 +241,7 @@ import
org.apache.ignite.internal.tx.message.TransactionMetaMessage;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
+import org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
import org.apache.ignite.internal.tx.message.TxStateResponse;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.type.NativeTypes;
@@ -2773,6 +2779,251 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertThat(processWithPrimacy(request),
willThrow(PrimaryReplicaMissException.class));
}
+ @ParameterizedTest
+ @MethodSource("prepareVersionsParameters")
+ void processTxStatePrimaryReplicaRequestTest(
+ boolean addVersionBeforeNewestCommitTs,
+ boolean addVersionAfterNewestCommitTsBeforeReadTs,
+ boolean addVersionEqualToReadTs,
+ boolean addVersionAfterReadTs,
+ boolean addWriteIntent,
+ boolean writeIntentHasThisTxId,
+ boolean outdatedReadTs,
+ boolean newestCommitTsIsPresent,
+ IgniteBiTuple<Class<Exception>, TransactionMeta> expected
+ ) {
+ UUID thisTxId = newTxId();
+ UUID otherTxId = newTxId();
+
+ RowId rowId = new RowId(PART_ID, 1, 1);
+ BinaryRow row = binaryRow(1);
+
+ prepareVersions(
+ rowId,
+ row,
+ thisTxId,
+ otherTxId,
+ addVersionBeforeNewestCommitTs,
+ addVersionAfterNewestCommitTsBeforeReadTs,
+ addVersionEqualToReadTs,
+ addVersionAfterReadTs,
+ addWriteIntent,
+ writeIntentHasThisTxId,
+ newestCommitTsIsPresent
+ );
+
+ HybridTimestamp readTs = outdatedReadTs
+ ? new HybridTimestamp(clockService.maxClockSkewMillis() - 1, 0)
+ : new HybridTimestamp(100, 0);
+ HybridTimestamp newestCommitTs = newestCommitTsIsPresent ? new
HybridTimestamp(50, 0) : null;
+
+ TxStatePrimaryReplicaRequest request =
TX_MESSAGES_FACTORY.txStatePrimaryReplicaRequest()
+ .groupId(zonePartitionIdMessage(grpId))
+ .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
+ .tableId(TABLE_ID)
+
.rowId(TX_MESSAGES_FACTORY.rowIdMessage().partitionId(rowId.partitionId()).uuid(rowId.uuid()).build())
+ .txId(thisTxId)
+ .newestCommitTimestamp(newestCommitTs)
+ .readTimestamp(readTs)
+ .build();
+
+ CompletableFuture<ReplicaResult> fut = processWithPrimacy(request);
+
+ if (expected.get1() != null) {
+ try {
+ assertThrowsWithCause(fut::join, expected.get1());
+ } catch (AssertionError e) {
+ log.info("Actual state: " + fut.join().result());
+ throw e;
+ }
+ } else {
+ assertThat(fut, willCompleteSuccessfully());
+ TransactionMeta expectedMeta = expected.get2();
+ ReplicaResult res = fut.join();
+
+ TransactionMeta meta = (TransactionMeta) res.result();
+ assertEquals(expectedMeta.txState(), meta.txState());
+
+ if (expectedMeta.txState() == COMMITTED) {
+ assertEquals(expectedMeta.commitTimestamp(),
meta.commitTimestamp());
+ }
+ }
+ }
+
+ private void prepareVersions(
+ RowId rowId,
+ BinaryRow row,
+ UUID thisTxId,
+ UUID otherTxId,
+ boolean addVersionBeforeNewestCommitTs,
+ boolean addVersionAfterNewestCommitTsBeforeReadTs,
+ boolean addVersionEqualToReadTs,
+ boolean addVersionAfterReadTs,
+ boolean addWriteIntent,
+ boolean writeIntentHasThisTxId,
+ boolean newestCommitTsIsPresent
+ ) {
+ if (newestCommitTsIsPresent) {
+ if (addVersionBeforeNewestCommitTs) {
+ addWriteAndCommit(rowId, row, otherTxId, new
HybridTimestamp(20, 0));
+ }
+
+ addWriteAndCommit(rowId, row, otherTxId, new HybridTimestamp(50,
0));
+ }
+
+ if (addVersionAfterNewestCommitTsBeforeReadTs) {
+ addWriteAndCommit(rowId, row, otherTxId, new HybridTimestamp(70,
0));
+ }
+
+ if (addVersionEqualToReadTs) {
+ addWriteAndCommit(rowId, row, otherTxId, new HybridTimestamp(100,
0));
+ }
+
+ if (addVersionAfterReadTs) {
+ addWriteAndCommit(rowId, row, otherTxId, new HybridTimestamp(150,
0));
+ }
+
+ if (addWriteIntent) {
+ UUID txId = writeIntentHasThisTxId ? thisTxId : otherTxId;
+ testMvPartitionStorage.addWrite(rowId, row, txId, 1, 1);
+ }
+ }
+
+ private void addWriteAndCommit(RowId rowId, BinaryRow row, UUID txId,
HybridTimestamp commitTs) {
+ testMvPartitionStorage.addWrite(rowId, row, txId, ZONE_ID, PART_ID);
+ testMvPartitionStorage.commitWrite(rowId, commitTs, txId);
+ }
+
+ private static Stream<Arguments> prepareVersionsParameters() {
+ boolean[] noCommittedVersions = { false, false, false, false };
+ boolean[] committedVersions = { true, true, true, true };
+ boolean[] commitTsBeforeReadTs = { false, true, false, false };
+ boolean[] commitTsBeforeReadTsWithEarlyHistory = { true, true, false,
false };
+ boolean[] commitTsEqualToReadTs = { false, false, true, false };
+ boolean[] commitTsAfterReadTs = { false, false, false, true };
+ boolean[] commitTsBeforeAndAfterReadTs = { false, true, false, true };
+
+ boolean[] noWriteIntent = { false, false };
+ boolean[] writeIntentThisTx = { true, true };
+ boolean[] writeIntentAnotherTx = { true, false };
+
+ boolean roOk = false;
+ boolean roOutdated = true;
+
+ boolean newestAbsent = false;
+ boolean newestPresent = true;
+
+ var outdatedError = new
IgniteBiTuple<>(OutdatedReadOnlyTransactionInternalException.class, null);
+ var pending = expectedTxStateArg(PENDING, null);
+ var aborted = expectedTxStateArg(ABORTED, null);
+ var committedBeforeReadTs = expectedTxStateArg(COMMITTED, new
HybridTimestamp(70, 0));
+ var committedEqualToReadTs = expectedTxStateArg(COMMITTED, new
HybridTimestamp(100, 0));
+ var unknown = expectedTxStateArg(UNKNOWN, null);
+
+ return Stream.of(
+ argumentSet(
+ "outdated ro txn, no committed versions",
+ flatArray(noCommittedVersions, noWriteIntent,
roOutdated, newestAbsent, outdatedError)
+ ),
+ argumentSet(
+ "outdated ro txn, no committed versions, write intent
of another tx",
+ flatArray(noCommittedVersions, writeIntentAnotherTx,
roOutdated, newestAbsent, outdatedError)
+ ),
+ argumentSet(
+ "outdated ro txn, committed versions",
+ flatArray(committedVersions, noWriteIntent,
roOutdated, newestAbsent, outdatedError)
+ ),
+ argumentSet(
+ "empty result",
+ flatArray(noCommittedVersions, noWriteIntent, roOk,
newestAbsent, aborted)
+ ),
+ argumentSet(
+ "write intent of this tx, no committed versions",
+ flatArray(noCommittedVersions, writeIntentThisTx,
roOk, newestAbsent, pending)
+ ),
+ argumentSet(
+ "write intent of another tx, no committed versions",
+ flatArray(noCommittedVersions, writeIntentAnotherTx,
roOk, newestAbsent, aborted)
+ ),
+ argumentSet(
+ "committed write intent, commit ts before read ts",
+ flatArray(commitTsBeforeReadTs, noWriteIntent, roOk,
newestAbsent, committedBeforeReadTs)
+ ),
+ argumentSet(
+ "committed write intent, commit ts before read ts,
early history",
+ flatArray(commitTsBeforeReadTsWithEarlyHistory,
noWriteIntent, roOk, newestAbsent, committedBeforeReadTs)
+ ),
+ argumentSet(
+ "committed write intent, commit ts equal to read ts",
+ flatArray(commitTsEqualToReadTs, noWriteIntent, roOk,
newestAbsent, committedEqualToReadTs)
+ ),
+ argumentSet(
+ "committed write intent, commit ts after read ts",
+ flatArray(commitTsAfterReadTs, noWriteIntent, roOk,
newestAbsent, unknown)
+ ),
+ argumentSet(
+ "committed write intent, commit ts before and after
read ts",
+ flatArray(commitTsBeforeAndAfterReadTs, noWriteIntent,
roOk, newestAbsent, committedBeforeReadTs)
+ ),
+ argumentSet(
+ "newest commitTs present, committed write intent,
commit ts before read ts",
+ flatArray(commitTsBeforeReadTs, noWriteIntent, roOk,
newestPresent, committedBeforeReadTs)
+ ),
+ argumentSet(
+ "newest commitTs present, early history, committed
write intent, commit ts before read ts",
+ flatArray(commitTsBeforeReadTsWithEarlyHistory,
noWriteIntent, roOk, newestPresent, committedBeforeReadTs)
+ ),
+ argumentSet(
+ "newest commitTs present, committed write intent,
commit ts equal to read ts",
+ flatArray(commitTsEqualToReadTs, noWriteIntent, roOk,
newestPresent, committedEqualToReadTs)
+ ),
+ argumentSet(
+ "newest commitTs present, committed write intent,
commit ts after read ts",
+ flatArray(commitTsAfterReadTs, noWriteIntent, roOk,
newestPresent, unknown)
+ ),
+ argumentSet(
+ "newest commitTs present, committed write intent,
commit ts before and after read ts",
+ flatArray(commitTsBeforeAndAfterReadTs, noWriteIntent,
roOk, newestPresent, committedBeforeReadTs)
+ ),
+ argumentSet(
+ "newest commitTs present, write intent of this tx",
+ flatArray(noCommittedVersions, writeIntentThisTx,
roOk, newestPresent, pending)
+ ),
+ argumentSet(
+ "newest commitTs present, write intent of another tx",
+ flatArray(noCommittedVersions, writeIntentAnotherTx,
roOk, newestPresent, aborted)
+ ),
+ argumentSet(
+ "newest commitTs present, write intent of another tx,
commit ts before read ts",
+ flatArray(commitTsBeforeReadTs, writeIntentAnotherTx,
roOk, newestPresent, committedBeforeReadTs)
+ ),
+ argumentSet(
+ "newest commitTs present, early history, write intent
of another tx, commit ts before read ts",
+ flatArray(commitTsBeforeReadTsWithEarlyHistory,
writeIntentAnotherTx, roOk, newestPresent, committedBeforeReadTs)
+ ),
+ argumentSet(
+ "newest commitTs present, write intent of another tx,
commit ts equal to read ts",
+ flatArray(commitTsEqualToReadTs, writeIntentAnotherTx,
roOk, newestPresent, committedEqualToReadTs)
+ ),
+ argumentSet(
+ "newest commitTs present, write intent of another tx,
commit ts after read ts",
+ flatArray(commitTsAfterReadTs, writeIntentAnotherTx,
roOk, newestPresent, unknown)
+ ),
+ argumentSet(
+ "newest commitTs present, write intent of another tx,
commit ts after before and after read ts",
+ flatArray(commitTsBeforeAndAfterReadTs,
writeIntentAnotherTx, roOk, newestPresent,
+ committedBeforeReadTs)
+ )
+ );
+ }
+
+ private static IgniteBiTuple<Class<Exception>, TransactionMeta>
expectedTxStateArg(
+ TxState txState,
+ @Nullable HybridTimestamp commitTs
+ ) {
+ return new IgniteBiTuple<>(null,
TxStateMeta.builder(txState).commitTimestamp(commitTs).build());
+ }
+
private CompletableFuture<ReplicaResult> processWithPrimacy(ReplicaRequest
request) {
return primacyEngine.validatePrimacy(request)
.thenCompose(primacy ->
partitionReplicaListener.process(request, primacy, localNode.id()));
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/OutdatedReadOnlyTransactionInternalException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/OutdatedReadOnlyTransactionInternalException.java
new file mode 100644
index 00000000000..7f5c7a8d817
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/OutdatedReadOnlyTransactionInternalException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Exception thrown when read-only transaction appears to be outdated due to
its read timestamp that is less than low watermark.
+ */
+public class OutdatedReadOnlyTransactionInternalException extends
IgniteInternalException {
+ private static final long serialVersionUID = 3136265772703827255L;
+
+ /**
+ * Constructor.
+ *
+ * @param readTimestamp Read timestamp.
+ * @param lwm Low watermark.
+ */
+ public OutdatedReadOnlyTransactionInternalException(HybridTimestamp
readTimestamp, HybridTimestamp lwm) {
+ this("Outdated read-only transaction", readTimestamp, lwm);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param message Message.
+ * @param readTimestamp Read timestamp.
+ * @param lwm Low watermark.
+ */
+ public OutdatedReadOnlyTransactionInternalException(String message,
HybridTimestamp readTimestamp, @Nullable HybridTimestamp lwm) {
+ super(TX_READ_ONLY_TOO_OLD_ERR, format("{} [readTimestamp={},
gcTimestamp={}].", message, readTimestamp, lwm));
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index 2bad2fdab19..6ca2351d075 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -53,15 +53,24 @@ public enum TxState {
* State that is assigned to a transaction due to absence of coordinator.
It is temporary and can be changed to
* {@link TxState#COMMITTED} or {@link TxState#ABORTED} after recovery or
successful write intent resolution.
*/
- ABANDONED(4);
+ ABANDONED(4),
+
+ /**
+ * Unknown transaction state. It may be used during transaction state
resolution, shouldn't be used for either volatile
+ * or persistent transaction meta. It means that the state of the sought
transaction is unrecoverable from current storage state,
+ * and it's possible if there is no corresponding committed version, so
the write intent that triggered this resolution
+ * should be ignored. Any transitions from or to this state are forbidden.
+ */
+ UNKNOWN(5);
private static final boolean[][] TRANSITION_MATRIX = {
- { false, true, true, true, true, true },
- { false, true, true, true, true, true },
- { false, false, false, true, true, true },
- { false, false, false, true, false, false },
- { false, false, false, false, true, false },
- { false, false, true, true, true, true }
+ { false, true, true, true, true, true, false },
+ { false, true, true, true, true, true, false },
+ { false, false, false, true, true, true, false },
+ { false, false, false, true, false, false, false },
+ { false, false, false, false, true, false, false },
+ { false, false, true, true, true, true, false },
+ { false, false, false, false, false, false, false }
};
/**
@@ -109,6 +118,7 @@ public enum TxState {
case 2: return ABORTED;
case 3: return COMMITTED;
case 4: return ABANDONED;
+ case 5: return UNKNOWN;
default:
throw new IllegalArgumentException("No enum constant from id:
" + id);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index 9e73b91f687..ddff0619475 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -21,7 +21,9 @@ import static java.util.Objects.requireNonNull;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.TxState.UNKNOWN;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
+import static
org.apache.ignite.internal.tx.TxStateMetaUnknown.txStateMetaUnknown;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import java.util.UUID;
@@ -360,6 +362,8 @@ public class TxStateMeta implements TransactionMeta {
return new TxStateMetaFinishing(txCoordinatorId,
commitPartitionId, isFinishedDueToTimeout, txLabel);
} else if (txState == ABANDONED) {
return new TxStateMetaAbandoned(txCoordinatorId,
commitPartitionId, tx, txLabel);
+ } else if (txState == UNKNOWN) {
+ return txStateMetaUnknown();
} else {
return new TxStateMeta(
txState,
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
new file mode 100644
index 00000000000..e1baa73159e
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateMetaMessage;
+
+/**
+ * Unknown transaction state meta.
+ */
+public class TxStateMetaUnknown extends TxStateMeta {
+ private static final long serialVersionUID = -2549857341422406570L;
+
+ public TxStateMetaUnknown() {
+ super(TxState.UNKNOWN, null, null, null, null, null);
+ }
+
+ public static TxStateMetaUnknown txStateMetaUnknown() {
+ return new TxStateMetaUnknown();
+ }
+
+ @Override
+ public TxStateMetaBuilder mutate() {
+ throw new AssertionError("UNKNOWN transaction state is immutable.");
+ }
+
+ @Override
+ public TxStateMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory
replicaMessagesFactory, TxMessagesFactory txMessagesFactory) {
+ return txMessagesFactory
+ .txStateMetaUnknownMessage()
+ .build();
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 690ef8f8c19..374b4e11cbf 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -40,7 +40,6 @@ import static
org.apache.ignite.internal.tx.TxStateMeta.builder;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
import java.util.ArrayList;
import java.util.Collection;
@@ -71,7 +70,6 @@ import
org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
@@ -101,6 +99,7 @@ import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.LocalRwTxCounter;
import org.apache.ignite.internal.tx.LockManager;
import
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import
org.apache.ignite.internal.tx.OutdatedReadOnlyTransactionInternalException;
import org.apache.ignite.internal.tx.PartitionEnlistment;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionMeta;
@@ -513,9 +512,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
boolean lockAcquired = lowWatermark.tryLock(txId, readTimestamp);
if (!lockAcquired) {
- throw new IgniteInternalException(
- TX_READ_ONLY_TOO_OLD_ERR,
- "Attempted to read data below the garbage collection
watermark: [readTimestamp={}, gcTimestamp={}]",
+ throw new OutdatedReadOnlyTransactionInternalException(
+ "Attempted to read data below the garbage collection
watermark",
readTimestamp,
lowWatermark.getLowWatermark());
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/RowIdMessage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/RowIdMessage.java
new file mode 100644
index 00000000000..9c928d2a367
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/RowIdMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static
org.apache.ignite.internal.tx.message.TxMessageGroup.ROW_ID_MESSAGE;
+
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.storage.RowId;
+
+/**
+ * Message for transferring {@link RowId}.
+ */
+@Transferable(ROW_ID_MESSAGE)
+public interface RowIdMessage extends NetworkMessage {
+ int partitionId();
+
+ UUID uuid();
+
+ default RowId asRowId() {
+ return new RowId(partitionId(), uuid());
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index 8e7e4a3179b..095f67dd6ef 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -122,4 +122,17 @@ public class TxMessageGroup {
* Message type for {@link TableWriteIntentSwitchReplicaRequest}.
*/
public static final short TABLE_WRITE_INTENT_SWITCH_REQUEST = 21;
+
+ /**
+ * Message type for {@link TxStatePrimaryReplicaRequest}.
+ */
+ public static final short TX_STATE_PRIMARY_REPLICA_REQUEST = 22;
+
+ /**
+ * Message type for {@link RowIdMessage}.
+ */
+ public static final short ROW_ID_MESSAGE = 23;
+
+ /** Message type for {@link TxStateMetaUnknownMessage}. */
+ public static final short TX_STATE_META_UNKNOWN_MESSAGE = 24;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaUnknownMessage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaUnknownMessage.java
new file mode 100644
index 00000000000..0255df33303
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaUnknownMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static
org.apache.ignite.internal.tx.message.TxMessageGroup.TX_STATE_META_UNKNOWN_MESSAGE;
+
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxStateMetaUnknown;
+
+/**
+ * Message for transferring a {@link TxStateMetaUnknown}.
+ */
+@Transferable(TX_STATE_META_UNKNOWN_MESSAGE)
+public interface TxStateMetaUnknownMessage extends TxStateMetaMessage {
+ default TransactionMeta asTxStateMetaUnknown() {
+ return new TxStateMetaUnknown();
+ }
+
+ @Override
+ default TransactionMeta asTransactionMeta() {
+ return asTxStateMetaUnknown();
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStatePrimaryReplicaRequest.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStatePrimaryReplicaRequest.java
new file mode 100644
index 00000000000..5562b193add
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStatePrimaryReplicaRequest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TableAware;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Transaction state request for primary replica path.
+ */
+@Transferable(TxMessageGroup.TX_STATE_PRIMARY_REPLICA_REQUEST)
+public interface TxStatePrimaryReplicaRequest extends PrimaryReplicaRequest,
TableAware {
+ UUID txId();
+
+ RowIdMessage rowId();
+
+ @Nullable HybridTimestamp newestCommitTimestamp();
+
+ HybridTimestamp readTimestamp();
+}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
index 6d84c43513f..6bd7d8a87b2 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
@@ -37,7 +37,15 @@ public class TxStateTest {
@Test
void testStates() {
assertThat(TxState.values(),
- arrayContaining(TxState.PENDING, TxState.FINISHING,
TxState.ABORTED, TxState.COMMITTED, TxState.ABANDONED));
+ arrayContaining(
+ TxState.PENDING,
+ TxState.FINISHING,
+ TxState.ABORTED,
+ TxState.COMMITTED,
+ TxState.ABANDONED,
+ TxState.UNKNOWN
+ )
+ );
}
@Test
@@ -46,6 +54,7 @@ public class TxStateTest {
assertFalse(TxState.isFinalState(TxState.PENDING));
assertFalse(TxState.isFinalState(TxState.FINISHING));
assertFalse(TxState.isFinalState(TxState.ABANDONED));
+ assertFalse(TxState.isFinalState(TxState.UNKNOWN));
// Final.
assertTrue(TxState.isFinalState(TxState.ABORTED));
@@ -59,6 +68,9 @@ public class TxStateTest {
@Test
void testTransitionsFromNull() {
+ // Not allowed.
+ assertFalse(TxState.checkTransitionCorrectness(null, TxState.UNKNOWN));
+
// Allowed.
assertTrue(TxState.checkTransitionCorrectness(null, TxState.PENDING));
assertTrue(TxState.checkTransitionCorrectness(null, TxState.ABORTED));
@@ -69,6 +81,9 @@ public class TxStateTest {
@Test
void testTransitionsFromPending() {
+ // Not allowed.
+ assertFalse(TxState.checkTransitionCorrectness(TxState.PENDING,
TxState.UNKNOWN));
+
// Allowed.
assertTrue(TxState.checkTransitionCorrectness(TxState.PENDING,
TxState.PENDING));
assertTrue(TxState.checkTransitionCorrectness(TxState.PENDING,
TxState.FINISHING));
@@ -82,6 +97,7 @@ public class TxStateTest {
// Not allowed.
assertFalse(TxState.checkTransitionCorrectness(TxState.FINISHING,
TxState.PENDING));
assertFalse(TxState.checkTransitionCorrectness(TxState.FINISHING,
TxState.FINISHING));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.FINISHING,
TxState.UNKNOWN));
// Allowed.
assertTrue(TxState.checkTransitionCorrectness(TxState.FINISHING,
TxState.ABORTED));
@@ -96,6 +112,7 @@ public class TxStateTest {
assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED,
TxState.FINISHING));
assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED,
TxState.COMMITTED));
assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED,
TxState.ABANDONED));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED,
TxState.UNKNOWN));
// Allowed.
assertTrue(TxState.checkTransitionCorrectness(TxState.ABORTED,
TxState.ABORTED));
@@ -108,6 +125,7 @@ public class TxStateTest {
assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITTED,
TxState.FINISHING));
assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITTED,
TxState.ABORTED));
assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITTED,
TxState.ABANDONED));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITTED,
TxState.UNKNOWN));
// Allowed.
assertTrue(TxState.checkTransitionCorrectness(TxState.COMMITTED,
TxState.COMMITTED));
@@ -120,6 +138,7 @@ public class TxStateTest {
void testTransitionsFromAbandoned() {
// Not allowed.
assertFalse(TxState.checkTransitionCorrectness(TxState.ABANDONED,
TxState.PENDING));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.ABANDONED,
TxState.UNKNOWN));
// Allowed.
assertTrue(TxState.checkTransitionCorrectness(TxState.ABANDONED,
TxState.FINISHING));
@@ -128,13 +147,28 @@ public class TxStateTest {
assertTrue(TxState.checkTransitionCorrectness(TxState.ABANDONED,
TxState.ABANDONED));
}
+ /**
+ * Transition from UNKNOWN to any state is not allowed.
+ */
+ @Test
+ void testTransitionsFromUnknown() {
+ // Not allowed.
+ assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN,
TxState.PENDING));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN,
TxState.FINISHING));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN,
TxState.ABORTED));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN,
TxState.COMMITTED));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN,
TxState.ABANDONED));
+ assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN,
TxState.UNKNOWN));
+ }
+
private static Stream<Arguments> txStateIds() {
return Stream.of(
arguments(TxState.PENDING, 0),
arguments(TxState.FINISHING, 1),
arguments(TxState.ABORTED, 2),
arguments(TxState.COMMITTED, 3),
- arguments(TxState.ABANDONED, 4)
+ arguments(TxState.ABANDONED, 4),
+ arguments(TxState.UNKNOWN, 5)
);
}
@@ -149,12 +183,11 @@ public class TxStateTest {
@MethodSource("txStateIds")
void testFromId(TxState expectedEnumEntry, int id) {
assertEquals(expectedEnumEntry, TxState.fromId(id));
-
}
@Test
void testFromIdThrows() {
assertThrows(IllegalArgumentException.class, () -> TxState.fromId(-1));
- assertThrows(IllegalArgumentException.class, () -> TxState.fromId(5));
+ assertThrows(IllegalArgumentException.class, () -> TxState.fromId(6));
}
}