This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bbd4c4e01f2 IGNITE-21910 Implement write intent resolution primary 
replica path (primary replica side) (#7408)
bbd4c4e01f2 is described below

commit bbd4c4e01f2791cbbda7ed6adf2c4b44cf44ebc9
Author: Denis Chudov <[email protected]>
AuthorDate: Mon Jan 19 21:59:35 2026 +0400

    IGNITE-21910 Implement write intent resolution primary replica path 
(primary replica side) (#7408)
---
 .../ignite/internal/hlc/HybridTimestamp.java       |   2 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |  24 ++
 ...xStateCommitPartitionReplicaRequestHandler.java |   1 +
 .../distributed/raft/TablePartitionProcessor.java  |  13 +-
 .../replicator/PartitionReplicaListener.java       | 200 +++++++++++++++-
 .../replication/PartitionReplicaListenerTest.java  | 251 +++++++++++++++++++++
 ...tdatedReadOnlyTransactionInternalException.java |  53 +++++
 .../org/apache/ignite/internal/tx/TxState.java     |  24 +-
 .../org/apache/ignite/internal/tx/TxStateMeta.java |   4 +
 .../ignite/internal/tx/TxStateMetaUnknown.java     |  49 ++++
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   8 +-
 .../ignite/internal/tx/message/RowIdMessage.java   |  39 ++++
 .../ignite/internal/tx/message/TxMessageGroup.java |  13 ++
 .../tx/message/TxStateMetaUnknownMessage.java      |  39 ++++
 .../tx/message/TxStatePrimaryReplicaRequest.java   |  39 ++++
 .../org/apache/ignite/internal/tx/TxStateTest.java |  41 +++-
 16 files changed, 771 insertions(+), 29 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index f46c3348911..988f69f72c8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -209,7 +209,7 @@ public final class HybridTimestamp implements 
Comparable<HybridTimestamp>, Seria
     }
 
     @Override
-    public boolean equals(Object o) {
+    public boolean equals(@Nullable Object o) {
         if (this == o) {
             return true;
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 7b68fdb01fa..db898d50d11 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
@@ -1319,6 +1320,29 @@ public class IgniteUtils {
         }
     }
 
+    /**
+     * Makes array from the given arguments, if any argument is an array 
itself, its elements are added into result instead of it.
+     *
+     * @param arguments Arguments.
+     * @return Flat array.
+     */
+    public static Object[] flatArray(Object... arguments) {
+        List<Object> list = new ArrayList<>();
+
+        for (Object arg : arguments) {
+            if (arg != null && arg.getClass().isArray()) {
+                int length = Array.getLength(arg);
+                for (int i = 0; i < length; i++) {
+                    list.add(Array.get(arg, i));
+                }
+            } else {
+                list.add(arg);
+            }
+        }
+
+        return list.toArray();
+    }
+
     private static CompletableFuture<Void> startAsync(ComponentContext 
componentContext, Stream<? extends IgniteComponent> components) {
         return allOf(components
                 .filter(Objects::nonNull)
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
index de23a2cc628..f64c86af78e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
@@ -191,6 +191,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
             }
         } else {
             // Recovery is not needed.
+            // TODO https://issues.apache.org/jira/browse/IGNITE-27494 Add 
UNKNOWN state handling.
             assert isFinalState(txMeta.txState()) : "Unexpected transaction 
state: " + txMeta;
 
             return completedFuture(txMeta);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index 4b46f0dd9f7..a8e520e5a8e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -27,7 +27,6 @@ import static 
org.apache.ignite.internal.partition.replicator.raft.CommandResult
 import static 
org.apache.ignite.internal.partition.replicator.raft.CommandResult.EMPTY_NOT_APPLIED_RESULT;
 import static 
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTs;
 import static 
org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTsOrNull;
-import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
 
 import java.util.HashSet;
@@ -531,11 +530,13 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
     }
 
     private void replicaTouch(UUID txId, UUID txCoordinatorId, HybridTimestamp 
commitTimestamp, boolean full) {
-        txManager.updateTxMeta(txId, old -> TxStateMeta.builder(old, full ? 
COMMITTED : PENDING)
-                .txCoordinatorId(txCoordinatorId)
-                .commitTimestamp(full ? commitTimestamp : null)
-                .build()
-        );
+        // Saving state is not needed for full transactions.
+        if (!full) {
+            txManager.updateTxMeta(txId, old -> TxStateMeta.builder(old, 
PENDING)
+                    .txCoordinatorId(txCoordinatorId)
+                    .build()
+            );
+        }
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index dfce5104dae..9419bbc2434 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.replicator;
 
+import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
@@ -33,11 +34,14 @@ import static 
org.apache.ignite.internal.partition.replicator.network.replicatio
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
 import static 
org.apache.ignite.internal.table.distributed.replicator.RemoteResourceIds.cursorId;
 import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.UNKNOWN;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static org.apache.ignite.internal.tx.TxStateMeta.builder;
+import static 
org.apache.ignite.internal.tx.TxStateMetaUnknown.txStateMetaUnknown;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
@@ -54,7 +58,6 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.CURSOR_CLOSE_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
-import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -182,6 +185,7 @@ import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.LockKey;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.LockMode;
+import 
org.apache.ignite.internal.tx.OutdatedReadOnlyTransactionInternalException;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
@@ -191,6 +195,7 @@ import 
org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import 
org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
 import 
org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.CursorUtils;
@@ -612,6 +617,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             return 
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) 
request, opStartTsIfDirectRo);
         } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
             return 
processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) 
request, opStartTsIfDirectRo);
+        } else if (request instanceof TxStatePrimaryReplicaRequest) {
+            return 
processTxStatePrimaryReplicaRequest((TxStatePrimaryReplicaRequest) request);
         }
 
         // Unknown request.
@@ -1793,9 +1800,8 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             HybridTimestamp lwm = lowWatermark.getLowWatermark();
 
             if (lwm != null && opStartTimestamp.compareTo(lwm) < 0) {
-                throw new IgniteInternalException(
-                        TX_READ_ONLY_TOO_OLD_ERR,
-                        "Attempted to read data below the garbage collection 
watermark: [readTimestamp={}, gcTimestamp={}]",
+                throw new OutdatedReadOnlyTransactionInternalException(
+                        "Attempted to read data below the garbage collection 
watermark",
                         opStartTimestamp,
                         lowWatermark.getLowWatermark());
             }
@@ -2223,6 +2229,188 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         }
     }
 
+    private CompletableFuture<TransactionMeta> 
processTxStatePrimaryReplicaRequest(TxStatePrimaryReplicaRequest request) {
+        return completedFuture(txMetaFromStorage(
+                request.rowId().asRowId(),
+                request.txId(),
+                request.newestCommitTimestamp(),
+                request.readTimestamp()
+        ));
+    }
+
+    /**
+     * On primary replica, we can determine transaction state by checking the 
storage state having row id, transaction id,
+     * newest commit timestamp known to requesting replica and read timestamp.
+     * Non-primary replica must wait for safe time that is equal to or greater 
than the read timestamp it provides in the request.
+     * Given this, we can see in the storage doing read with the given read 
timestamp, and the return result:
+     *
+     * <ul>
+     *   <li>Short path (using {@link MvPartitionStorage#read(RowId, 
HybridTimestamp)}):
+     *     <ul>
+     *       <li>there is write intent having transaction id of the sought 
transaction: respond with {@code PENDING};</li>
+     *       <li>
+     *         there is the latest committed version and its timestamp (is 
greater than the newest commit timestamp from request or
+     *         newest commit timestamp is null), and less than or equal to the 
read timestamp. This means, the write intent for
+     *         this committed version was created before read timestamp, and 
requesting replica has seen it, so it is
+     *         committed write intent for sought transaction: respond with 
{@code COMMITTED}.
+     *       </li>
+     *     </ul>
+     *   </li>
+     *   <li>Otherwise we need to scan the storage. Possible outcomes:
+     *     <ul>
+     *       <li>
+     *         there is committed version and its timestamp (is greater than 
the newest commit timestamp
+     *         or newest commit timestamp is null) and less than or equal to 
the read timestamp. This means, the write intent for
+     *         this committed version was created before read timestamp, and 
requesting replica has seen it, so it is
+     *         committed write intent for sought transaction: respond with 
{@code COMMITTED};
+     *       </li>
+     *       <li>
+     *         if read timestamp is not {@link 
ClockService#after(HybridTimestamp, HybridTimestamp)} low watermark or
+     *         newest commit timestamp is not {@link 
ClockService#after(HybridTimestamp, HybridTimestamp)} low watermark then throw
+     *         {@link OutdatedReadOnlyTransactionInternalException};
+     *       </li>
+     *       <li>
+     *         there is only committed version(s) with timestamp greater than 
the newest commit timestamp and greater than
+     *         read timestamp. This means that state of the sought transaction 
is unrecoverable from storage state.
+     *         However, the write intent should be ignored by the requesting 
replica. Respond with {@code UNKNOWN};
+     *       </li>
+     *       <li>otherwise, if there is any committed version before read 
timestamp: respond with {@code ABORTED}.</li>
+     *     </ul>
+     *   </li>
+     * </ul>
+     *
+     * @param rowId Row id.
+     * @param txId Transaction id.
+     * @param newestCommitTimestamp Newest commit timestamp known to sender 
node.
+     * @param readTimestamp Read timestamp.
+     * @return Transaction meta calculated from the storage state.
+     */
+    private TransactionMeta txMetaFromStorage(
+            RowId rowId,
+            UUID txId,
+            @Nullable HybridTimestamp newestCommitTimestamp,
+            HybridTimestamp readTimestamp
+    ) {
+        ReadResult readResult = mvDataStorage.read(rowId, readTimestamp);
+
+        requireNonNull(readResult, "Read result must not be null.");
+
+        if (isWriteIntentBelongingToThisTx(readResult, txId)) {
+            return builder(PENDING).build();
+        } else if (isNewCommittedVersionBeforeReadTimestamp(readResult, 
newestCommitTimestamp, readTimestamp)) {
+            return 
builder(COMMITTED).commitTimestamp(readResult.commitTimestamp()).build();
+        } else {
+            // In this case we can't be sure if the write intent of sought 
transaction is committed or aborted, scan the storage.
+            // Consider the example:
+            // - wi is created at t1
+            // - read ts is t2
+            // - tx is finished at t3, it may be committed with commit ts t3, 
but wi is switched at t4
+            // - now is t5
+            // - backup replica waits for safe time >= t2, here on primary we 
read from storage using t2 and can't see version
+            // modification at t4, because there are two operations: addWrite 
(at t2) and commitWrite (at t4),
+            // so we need to scan the storage to see all versions up to now. 
Commit timestamp is checked on the requesting replica.
+            return txMetaFromStorageWithScan(rowId, txId, 
newestCommitTimestamp, readTimestamp);
+        }
+    }
+
+    private static boolean isWriteIntentBelongingToThisTx(ReadResult rr, UUID 
txId) {
+        return rr.isWriteIntent() && rr.transactionId() != null && 
rr.transactionId().equals(txId);
+    }
+
+    private static boolean isCommittedVersion(ReadResult rr) {
+        return !rr.isEmpty() && !rr.isWriteIntent();
+    }
+
+    private static boolean isNewCommittedVersionBeforeReadTimestamp(
+            ReadResult rr,
+            @Nullable HybridTimestamp newestCommitTimestamp,
+            HybridTimestamp readTimestamp
+    ) {
+        return isCommittedVersion(rr)
+                && (newestCommitTimestamp == null || 
rr.commitTimestamp().longValue() > newestCommitTimestamp.longValue())
+                && rr.commitTimestamp().longValue() <= 
readTimestamp.longValue();
+    }
+
+    private boolean isReadTimestampOutdated(HybridTimestamp readTimestamp, 
@Nullable HybridTimestamp newestCommitTimestamp) {
+        HybridTimestamp lwm = lowWatermark.getLowWatermark();
+
+        HybridTimestamp earliestDataAvailableTimestamp =  lwm == null ? 
HybridTimestamp.MIN_VALUE : lwm;
+
+        return !clockService.after(readTimestamp, 
earliestDataAvailableTimestamp)
+                || (newestCommitTimestamp != null && 
!clockService.after(newestCommitTimestamp, earliestDataAvailableTimestamp));
+    }
+
+    private TransactionMeta txMetaFromStorageWithScan(
+            RowId rowId,
+            UUID txId,
+            @Nullable HybridTimestamp newestCommitTimestamp,
+            HybridTimestamp readTimestamp
+    ) {
+        ReadResult anyCommittedAfterReadTs = null;
+        ReadResult wi = null;
+
+        try (Cursor<ReadResult> versions = mvDataStorage.scanVersions(rowId)) {
+            for (ReadResult rr : versions) {
+                if (rr != null && !rr.isEmpty()) {
+                    if (rr.isWriteIntent()) {
+                        wi = rr;
+                    } else {
+                        requireNonNull(rr.commitTimestamp(), "Committed read 
result must have commit timestamp");
+
+                        long commitTs = rr.commitTimestamp().longValue();
+
+                        // newestCommitTimestamp comes from requesting replica 
and is always less than read timestamp (if not null).
+                        if (newestCommitTimestamp == null || commitTs > 
newestCommitTimestamp.longValue()) {
+                            if (commitTs <= readTimestamp.longValue()) {
+                                // This means, the write intent for this 
committed version was created before read timestamp,
+                                // and requesting replica has seen it, so this 
is committed write intent for sought transaction.
+                                // Commit timestamp is checked on the 
requesting replica.
+                                return 
builder(COMMITTED).commitTimestamp(rr.commitTimestamp()).build();
+                            } else {
+                                anyCommittedAfterReadTs = rr;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        if (wi != null && isWriteIntentBelongingToThisTx(wi, txId)) {
+            return builder(PENDING).build();
+        } else if (isReadTimestampOutdated(readTimestamp, 
newestCommitTimestamp)) {
+            // In this case we can't be sure about anything that happened 
before read timestamp.
+            throw new 
OutdatedReadOnlyTransactionInternalException(readTimestamp, 
lowWatermark.getLowWatermark());
+        } else if (anyCommittedAfterReadTs != null) {
+            // This means that state of the sought transaction is 
unrecoverable from storage state because committed versions don't
+            // contain tx id.
+            // Consider the examples:
+            // Example 1:
+            // - wi created at t1 by tx1
+            // - read ts is t2, backup replica waits for t2
+            // - tx1 is committed at t5, commit ts is t5
+            // - commit write is replicated at t6
+            // - now is t7
+            // - on primary replica, we see committed version with commit ts t5
+            // Example 2:
+            // - wi1 created at t1 by tx1
+            // - read ts is t2, backup replica waits for t2
+            // - tx1 is aborted at t3
+            // - tx2 creates wi2 on primary at t4
+            // - tx2 is committed at t5, commit ts is t5
+            // - commit write is replicated at t6
+            // - now is t7
+            // - on primary replica, we see committed version with commit ts t5
+            // In 1st example tx1 was committed and in 2nd it was aborted but 
in both examples the state of the storage is the same.
+            // So the state of tx1 is unknown.
+            // However, the write intent should be ignored by the requesting 
replica because nothing was committed
+            // with commit timestamp less than read timestamp.
+            return txStateMetaUnknown();
+        } else {
+            // These can be only versions known to requesting replica.
+            return builder(ABORTED).build();
+        }
+    }
+
     private static <T> boolean allElementsAreNull(List<T> list) {
         for (T element : list) {
             if (element != null) {
@@ -3313,7 +3501,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
      *         read time).
      */
     private static Boolean canReadFromWriteIntent(UUID txId, TransactionMeta 
txMeta, @Nullable HybridTimestamp timestamp) {
-        assert isFinalState(txMeta.txState()) || txMeta.txState() == PENDING
+        assert isFinalState(txMeta.txState()) || txMeta.txState() == PENDING 
|| txMeta.txState() == UNKNOWN
                 : format("Unexpected state defined by write intent resolution 
[txId={}, txMeta={}].", txId, txMeta);
 
         if (txMeta.txState() == COMMITTED) {
@@ -3321,7 +3509,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
             return readLatest || txMeta.commitTimestamp().compareTo(timestamp) 
<= 0;
         } else {
-            // Either ABORTED or PENDING.
+            // Either ABORTED, PENDING or UNKNOWN. When UNKNOWN, write intent 
is never readable, see state description.
             return false;
         }
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 975e8934e39..dee8e3fee0e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -41,6 +41,7 @@ import static 
org.apache.ignite.internal.partition.replicator.network.replicatio
 import static 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT_ALL;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
 import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -50,10 +51,12 @@ import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
+import static org.apache.ignite.internal.tx.TxState.UNKNOWN;
 import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
 import static 
org.apache.ignite.internal.tx.test.TxStateMetaTestUtils.assertTxStateMetaIsSame;
 import static org.apache.ignite.internal.util.ArrayUtils.asList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.flatArray;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
@@ -128,6 +131,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -220,6 +224,7 @@ import 
org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.LockManager;
+import 
org.apache.ignite.internal.tx.OutdatedReadOnlyTransactionInternalException;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TransactionResult;
 import org.apache.ignite.internal.tx.TxManager;
@@ -236,6 +241,7 @@ import 
org.apache.ignite.internal.tx.message.TransactionMetaMessage;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
+import org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxStateResponse;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
 import org.apache.ignite.internal.type.NativeTypes;
@@ -2773,6 +2779,251 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertThat(processWithPrimacy(request), 
willThrow(PrimaryReplicaMissException.class));
     }
 
+    @ParameterizedTest
+    @MethodSource("prepareVersionsParameters")
+    void processTxStatePrimaryReplicaRequestTest(
+            boolean addVersionBeforeNewestCommitTs,
+            boolean addVersionAfterNewestCommitTsBeforeReadTs,
+            boolean addVersionEqualToReadTs,
+            boolean addVersionAfterReadTs,
+            boolean addWriteIntent,
+            boolean writeIntentHasThisTxId,
+            boolean outdatedReadTs,
+            boolean newestCommitTsIsPresent,
+            IgniteBiTuple<Class<Exception>, TransactionMeta> expected
+    ) {
+        UUID thisTxId = newTxId();
+        UUID otherTxId = newTxId();
+
+        RowId rowId = new RowId(PART_ID, 1, 1);
+        BinaryRow row = binaryRow(1);
+
+        prepareVersions(
+                rowId,
+                row,
+                thisTxId,
+                otherTxId,
+                addVersionBeforeNewestCommitTs,
+                addVersionAfterNewestCommitTsBeforeReadTs,
+                addVersionEqualToReadTs,
+                addVersionAfterReadTs,
+                addWriteIntent,
+                writeIntentHasThisTxId,
+                newestCommitTsIsPresent
+        );
+
+        HybridTimestamp readTs = outdatedReadTs
+                ? new HybridTimestamp(clockService.maxClockSkewMillis() - 1, 0)
+                : new HybridTimestamp(100, 0);
+        HybridTimestamp newestCommitTs = newestCommitTsIsPresent ? new 
HybridTimestamp(50, 0) : null;
+
+        TxStatePrimaryReplicaRequest request = 
TX_MESSAGES_FACTORY.txStatePrimaryReplicaRequest()
+                .groupId(zonePartitionIdMessage(grpId))
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
+                .tableId(TABLE_ID)
+                
.rowId(TX_MESSAGES_FACTORY.rowIdMessage().partitionId(rowId.partitionId()).uuid(rowId.uuid()).build())
+                .txId(thisTxId)
+                .newestCommitTimestamp(newestCommitTs)
+                .readTimestamp(readTs)
+                .build();
+
+        CompletableFuture<ReplicaResult> fut = processWithPrimacy(request);
+
+        if (expected.get1() != null) {
+            try {
+                assertThrowsWithCause(fut::join, expected.get1());
+            } catch (AssertionError e) {
+                log.info("Actual state: " + fut.join().result());
+                throw e;
+            }
+        } else {
+            assertThat(fut, willCompleteSuccessfully());
+            TransactionMeta expectedMeta = expected.get2();
+            ReplicaResult res = fut.join();
+
+            TransactionMeta meta = (TransactionMeta) res.result();
+            assertEquals(expectedMeta.txState(), meta.txState());
+
+            if (expectedMeta.txState() == COMMITTED) {
+                assertEquals(expectedMeta.commitTimestamp(), 
meta.commitTimestamp());
+            }
+        }
+    }
+
+    private void prepareVersions(
+            RowId rowId,
+            BinaryRow row,
+            UUID thisTxId,
+            UUID otherTxId,
+            boolean addVersionBeforeNewestCommitTs,
+            boolean addVersionAfterNewestCommitTsBeforeReadTs,
+            boolean addVersionEqualToReadTs,
+            boolean addVersionAfterReadTs,
+            boolean addWriteIntent,
+            boolean writeIntentHasThisTxId,
+            boolean newestCommitTsIsPresent
+    ) {
+        if (newestCommitTsIsPresent) {
+            if (addVersionBeforeNewestCommitTs) {
+                addWriteAndCommit(rowId, row, otherTxId, new 
HybridTimestamp(20, 0));
+            }
+
+            addWriteAndCommit(rowId, row, otherTxId, new HybridTimestamp(50, 
0));
+        }
+
+        if (addVersionAfterNewestCommitTsBeforeReadTs) {
+            addWriteAndCommit(rowId, row, otherTxId, new HybridTimestamp(70, 
0));
+        }
+
+        if (addVersionEqualToReadTs) {
+            addWriteAndCommit(rowId, row, otherTxId, new HybridTimestamp(100, 
0));
+        }
+
+        if (addVersionAfterReadTs) {
+            addWriteAndCommit(rowId, row, otherTxId, new HybridTimestamp(150, 
0));
+        }
+
+        if (addWriteIntent) {
+            UUID txId = writeIntentHasThisTxId ? thisTxId : otherTxId;
+            testMvPartitionStorage.addWrite(rowId, row, txId, 1, 1);
+        }
+    }
+
+    private void addWriteAndCommit(RowId rowId, BinaryRow row, UUID txId, 
HybridTimestamp commitTs) {
+        testMvPartitionStorage.addWrite(rowId, row, txId, ZONE_ID, PART_ID);
+        testMvPartitionStorage.commitWrite(rowId, commitTs, txId);
+    }
+
+    private static Stream<Arguments> prepareVersionsParameters() {
+        boolean[] noCommittedVersions = { false, false, false, false };
+        boolean[] committedVersions = { true, true, true, true };
+        boolean[] commitTsBeforeReadTs = { false, true, false, false };
+        boolean[] commitTsBeforeReadTsWithEarlyHistory = { true, true, false, 
false };
+        boolean[] commitTsEqualToReadTs = { false, false, true, false };
+        boolean[] commitTsAfterReadTs = { false, false, false, true };
+        boolean[] commitTsBeforeAndAfterReadTs = { false, true, false, true };
+
+        boolean[] noWriteIntent = { false, false };
+        boolean[] writeIntentThisTx = { true, true };
+        boolean[] writeIntentAnotherTx = { true, false };
+
+        boolean roOk = false;
+        boolean roOutdated = true;
+
+        boolean newestAbsent = false;
+        boolean newestPresent = true;
+
+        var outdatedError = new 
IgniteBiTuple<>(OutdatedReadOnlyTransactionInternalException.class, null);
+        var pending = expectedTxStateArg(PENDING, null);
+        var aborted = expectedTxStateArg(ABORTED, null);
+        var committedBeforeReadTs = expectedTxStateArg(COMMITTED, new 
HybridTimestamp(70, 0));
+        var committedEqualToReadTs = expectedTxStateArg(COMMITTED, new 
HybridTimestamp(100, 0));
+        var unknown = expectedTxStateArg(UNKNOWN, null);
+
+        return Stream.of(
+                argumentSet(
+                        "outdated ro txn, no committed versions",
+                        flatArray(noCommittedVersions, noWriteIntent, 
roOutdated, newestAbsent, outdatedError)
+                ),
+                argumentSet(
+                        "outdated ro txn, no committed versions, write intent 
of another tx",
+                        flatArray(noCommittedVersions, writeIntentAnotherTx, 
roOutdated, newestAbsent, outdatedError)
+                ),
+                argumentSet(
+                        "outdated ro txn, committed versions",
+                        flatArray(committedVersions, noWriteIntent, 
roOutdated, newestAbsent, outdatedError)
+                ),
+                argumentSet(
+                        "empty result",
+                        flatArray(noCommittedVersions, noWriteIntent, roOk, 
newestAbsent, aborted)
+                ),
+                argumentSet(
+                        "write intent of this tx, no committed versions",
+                        flatArray(noCommittedVersions, writeIntentThisTx, 
roOk, newestAbsent, pending)
+                ),
+                argumentSet(
+                        "write intent of another tx, no committed versions",
+                        flatArray(noCommittedVersions, writeIntentAnotherTx, 
roOk, newestAbsent, aborted)
+                ),
+                argumentSet(
+                        "committed write intent, commit ts before read ts",
+                        flatArray(commitTsBeforeReadTs, noWriteIntent, roOk, 
newestAbsent, committedBeforeReadTs)
+                ),
+                argumentSet(
+                        "committed write intent, commit ts before read ts, 
early history",
+                        flatArray(commitTsBeforeReadTsWithEarlyHistory, 
noWriteIntent, roOk, newestAbsent, committedBeforeReadTs)
+                ),
+                argumentSet(
+                        "committed write intent, commit ts equal to read ts",
+                        flatArray(commitTsEqualToReadTs, noWriteIntent, roOk, 
newestAbsent, committedEqualToReadTs)
+                ),
+                argumentSet(
+                        "committed write intent, commit ts after read ts",
+                        flatArray(commitTsAfterReadTs, noWriteIntent, roOk, 
newestAbsent, unknown)
+                ),
+                argumentSet(
+                        "committed write intent, commit ts before and after 
read ts",
+                        flatArray(commitTsBeforeAndAfterReadTs, noWriteIntent, 
roOk, newestAbsent, committedBeforeReadTs)
+                ),
+                argumentSet(
+                        "newest commitTs present, committed write intent, 
commit ts before read ts",
+                        flatArray(commitTsBeforeReadTs, noWriteIntent, roOk, 
newestPresent, committedBeforeReadTs)
+                ),
+                argumentSet(
+                        "newest commitTs present, early history, committed 
write intent, commit ts before read ts",
+                        flatArray(commitTsBeforeReadTsWithEarlyHistory, 
noWriteIntent, roOk, newestPresent, committedBeforeReadTs)
+                ),
+                argumentSet(
+                        "newest commitTs present, committed write intent, 
commit ts equal to read ts",
+                        flatArray(commitTsEqualToReadTs, noWriteIntent, roOk, 
newestPresent, committedEqualToReadTs)
+                ),
+                argumentSet(
+                        "newest commitTs present, committed write intent, 
commit ts after read ts",
+                        flatArray(commitTsAfterReadTs, noWriteIntent, roOk, 
newestPresent, unknown)
+                ),
+                argumentSet(
+                        "newest commitTs present, committed write intent, 
commit ts before and after read ts",
+                        flatArray(commitTsBeforeAndAfterReadTs, noWriteIntent, 
roOk, newestPresent, committedBeforeReadTs)
+                ),
+                argumentSet(
+                        "newest commitTs present, write intent of this tx",
+                        flatArray(noCommittedVersions, writeIntentThisTx, 
roOk, newestPresent, pending)
+                ),
+                argumentSet(
+                        "newest commitTs present, write intent of another tx",
+                        flatArray(noCommittedVersions, writeIntentAnotherTx, 
roOk, newestPresent, aborted)
+                ),
+                argumentSet(
+                        "newest commitTs present, write intent of another tx, 
commit ts before read ts",
+                        flatArray(commitTsBeforeReadTs, writeIntentAnotherTx, 
roOk, newestPresent, committedBeforeReadTs)
+                ),
+                argumentSet(
+                        "newest commitTs present, early history, write intent 
of another tx, commit ts before read ts",
+                        flatArray(commitTsBeforeReadTsWithEarlyHistory, 
writeIntentAnotherTx, roOk, newestPresent, committedBeforeReadTs)
+                ),
+                argumentSet(
+                        "newest commitTs present, write intent of another tx, 
commit ts equal to read ts",
+                        flatArray(commitTsEqualToReadTs, writeIntentAnotherTx, 
roOk, newestPresent, committedEqualToReadTs)
+                ),
+                argumentSet(
+                        "newest commitTs present, write intent of another tx, 
commit ts after read ts",
+                        flatArray(commitTsAfterReadTs, writeIntentAnotherTx, 
roOk, newestPresent, unknown)
+                ),
+                argumentSet(
+                        "newest commitTs present, write intent of another tx, 
commit ts after before and after read ts",
+                        flatArray(commitTsBeforeAndAfterReadTs, 
writeIntentAnotherTx, roOk, newestPresent,
+                                committedBeforeReadTs)
+                )
+        );
+    }
+
+    private static IgniteBiTuple<Class<Exception>, TransactionMeta> 
expectedTxStateArg(
+            TxState txState,
+            @Nullable HybridTimestamp commitTs
+    ) {
+        return new IgniteBiTuple<>(null, 
TxStateMeta.builder(txState).commitTimestamp(commitTs).build());
+    }
+
     private CompletableFuture<ReplicaResult> processWithPrimacy(ReplicaRequest 
request) {
         return primacyEngine.validatePrimacy(request)
                 .thenCompose(primacy -> 
partitionReplicaListener.process(request, primacy, localNode.id()));
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/OutdatedReadOnlyTransactionInternalException.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/OutdatedReadOnlyTransactionInternalException.java
new file mode 100644
index 00000000000..7f5c7a8d817
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/OutdatedReadOnlyTransactionInternalException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Exception thrown when read-only transaction appears to be outdated due to 
its read timestamp that is less than low watermark.
+ */
+public class OutdatedReadOnlyTransactionInternalException extends 
IgniteInternalException {
+    private static final long serialVersionUID = 3136265772703827255L;
+
+    /**
+     * Constructor.
+     *
+     * @param readTimestamp Read timestamp.
+     * @param lwm Low watermark.
+     */
+    public OutdatedReadOnlyTransactionInternalException(HybridTimestamp 
readTimestamp, HybridTimestamp lwm) {
+        this("Outdated read-only transaction", readTimestamp, lwm);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param message Message.
+     * @param readTimestamp Read timestamp.
+     * @param lwm Low watermark.
+     */
+    public OutdatedReadOnlyTransactionInternalException(String message, 
HybridTimestamp readTimestamp, @Nullable HybridTimestamp lwm) {
+        super(TX_READ_ONLY_TOO_OLD_ERR, format("{} [readTimestamp={}, 
gcTimestamp={}].", message, readTimestamp, lwm));
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index 2bad2fdab19..6ca2351d075 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -53,15 +53,24 @@ public enum TxState {
      * State that is assigned to a transaction due to absence of coordinator. 
It is temporary and can be changed to
      * {@link TxState#COMMITTED} or {@link TxState#ABORTED} after recovery or 
successful write intent resolution.
      */
-    ABANDONED(4);
+    ABANDONED(4),
+
+    /**
+     * Unknown transaction state. It may be used during transaction state 
resolution, shouldn't be used for either volatile
+     * or persistent transaction meta. It means that the state of the sought 
transaction is unrecoverable from current storage state,
+     * and it's possible if there is no corresponding committed version, so 
the write intent that triggered this resolution
+     * should be ignored. Any transitions from or to this state are forbidden.
+     */
+    UNKNOWN(5);
 
     private static final boolean[][] TRANSITION_MATRIX = {
-            { false, true,  true, true,  true,  true },
-            { false, true,  true,  true,  true,  true },
-            { false, false, false, true,  true,  true },
-            { false, false, false, true,  false, false },
-            { false, false, false, false, true,  false },
-            { false,  false,  true,  true,  true,  true }
+            { false, true,  true,  true,  true,  true,  false },
+            { false, true,  true,  true,  true,  true,  false },
+            { false, false, false, true,  true,  true,  false },
+            { false, false, false, true,  false, false, false },
+            { false, false, false, false, true,  false, false },
+            { false, false, true,  true,  true,  true,  false },
+            { false, false, false, false, false, false, false }
     };
 
     /**
@@ -109,6 +118,7 @@ public enum TxState {
             case 2: return ABORTED;
             case 3: return COMMITTED;
             case 4: return ABANDONED;
+            case 5: return UNKNOWN;
             default:
                 throw new IllegalArgumentException("No enum constant from id: 
" + id);
         }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index 9e73b91f687..ddff0619475 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -21,7 +21,9 @@ import static java.util.Objects.requireNonNull;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.TxState.UNKNOWN;
 import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
+import static 
org.apache.ignite.internal.tx.TxStateMetaUnknown.txStateMetaUnknown;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 
 import java.util.UUID;
@@ -360,6 +362,8 @@ public class TxStateMeta implements TransactionMeta {
                 return new TxStateMetaFinishing(txCoordinatorId, 
commitPartitionId, isFinishedDueToTimeout, txLabel);
             } else if (txState == ABANDONED) {
                 return new TxStateMetaAbandoned(txCoordinatorId, 
commitPartitionId, tx, txLabel);
+            } else if (txState == UNKNOWN) {
+                return txStateMetaUnknown();
             } else {
                 return new TxStateMeta(
                         txState,
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
new file mode 100644
index 00000000000..e1baa73159e
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaUnknown.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateMetaMessage;
+
+/**
+ * Unknown transaction state meta.
+ */
+public class TxStateMetaUnknown extends TxStateMeta {
+    private static final long serialVersionUID = -2549857341422406570L;
+
+    public TxStateMetaUnknown() {
+        super(TxState.UNKNOWN, null, null, null, null, null);
+    }
+
+    public static TxStateMetaUnknown txStateMetaUnknown() {
+        return new TxStateMetaUnknown();
+    }
+
+    @Override
+    public TxStateMetaBuilder mutate() {
+        throw new AssertionError("UNKNOWN transaction state is immutable.");
+    }
+
+    @Override
+    public TxStateMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory 
replicaMessagesFactory, TxMessagesFactory txMessagesFactory) {
+        return txMessagesFactory
+                .txStateMetaUnknownMessage()
+                .build();
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 690ef8f8c19..374b4e11cbf 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -40,7 +40,6 @@ import static 
org.apache.ignite.internal.tx.TxStateMeta.builder;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -71,7 +70,6 @@ import 
org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
@@ -101,6 +99,7 @@ import org.apache.ignite.internal.tx.InternalTxOptions;
 import org.apache.ignite.internal.tx.LocalRwTxCounter;
 import org.apache.ignite.internal.tx.LockManager;
 import 
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import 
org.apache.ignite.internal.tx.OutdatedReadOnlyTransactionInternalException;
 import org.apache.ignite.internal.tx.PartitionEnlistment;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TransactionMeta;
@@ -513,9 +512,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
         boolean lockAcquired = lowWatermark.tryLock(txId, readTimestamp);
         if (!lockAcquired) {
-            throw new IgniteInternalException(
-                    TX_READ_ONLY_TOO_OLD_ERR,
-                    "Attempted to read data below the garbage collection 
watermark: [readTimestamp={}, gcTimestamp={}]",
+            throw new OutdatedReadOnlyTransactionInternalException(
+                    "Attempted to read data below the garbage collection 
watermark",
                     readTimestamp,
                     lowWatermark.getLowWatermark());
         }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/RowIdMessage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/RowIdMessage.java
new file mode 100644
index 00000000000..9c928d2a367
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/RowIdMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static 
org.apache.ignite.internal.tx.message.TxMessageGroup.ROW_ID_MESSAGE;
+
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.storage.RowId;
+
+/**
+ * Message for transferring {@link RowId}.
+ */
+@Transferable(ROW_ID_MESSAGE)
+public interface RowIdMessage extends NetworkMessage {
+    int partitionId();
+
+    UUID uuid();
+
+    default RowId asRowId() {
+        return new RowId(partitionId(), uuid());
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index 8e7e4a3179b..095f67dd6ef 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -122,4 +122,17 @@ public class TxMessageGroup {
      * Message type for {@link TableWriteIntentSwitchReplicaRequest}.
      */
     public static final short TABLE_WRITE_INTENT_SWITCH_REQUEST = 21;
+
+    /**
+     * Message type for {@link TxStatePrimaryReplicaRequest}.
+     */
+    public static final short TX_STATE_PRIMARY_REPLICA_REQUEST = 22;
+
+    /**
+     * Message type for {@link RowIdMessage}.
+     */
+    public static final short ROW_ID_MESSAGE = 23;
+
+    /** Message type for {@link TxStateMetaUnknownMessage}. */
+    public static final short TX_STATE_META_UNKNOWN_MESSAGE = 24;
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaUnknownMessage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaUnknownMessage.java
new file mode 100644
index 00000000000..0255df33303
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaUnknownMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static 
org.apache.ignite.internal.tx.message.TxMessageGroup.TX_STATE_META_UNKNOWN_MESSAGE;
+
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxStateMetaUnknown;
+
+/**
+ * Message for transferring a {@link TxStateMetaUnknown}.
+ */
+@Transferable(TX_STATE_META_UNKNOWN_MESSAGE)
+public interface TxStateMetaUnknownMessage extends TxStateMetaMessage {
+    default TransactionMeta asTxStateMetaUnknown() {
+        return new TxStateMetaUnknown();
+    }
+
+    @Override
+    default TransactionMeta asTransactionMeta() {
+        return asTxStateMetaUnknown();
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStatePrimaryReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStatePrimaryReplicaRequest.java
new file mode 100644
index 00000000000..5562b193add
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStatePrimaryReplicaRequest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TableAware;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Transaction state request for primary replica path.
+ */
+@Transferable(TxMessageGroup.TX_STATE_PRIMARY_REPLICA_REQUEST)
+public interface TxStatePrimaryReplicaRequest extends PrimaryReplicaRequest, 
TableAware {
+    UUID txId();
+
+    RowIdMessage rowId();
+
+    @Nullable HybridTimestamp newestCommitTimestamp();
+
+    HybridTimestamp readTimestamp();
+}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
index 6d84c43513f..6bd7d8a87b2 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxStateTest.java
@@ -37,7 +37,15 @@ public class TxStateTest {
     @Test
     void testStates() {
         assertThat(TxState.values(),
-                arrayContaining(TxState.PENDING, TxState.FINISHING, 
TxState.ABORTED, TxState.COMMITTED, TxState.ABANDONED));
+                arrayContaining(
+                        TxState.PENDING,
+                        TxState.FINISHING,
+                        TxState.ABORTED,
+                        TxState.COMMITTED,
+                        TxState.ABANDONED,
+                        TxState.UNKNOWN
+                )
+        );
     }
 
     @Test
@@ -46,6 +54,7 @@ public class TxStateTest {
         assertFalse(TxState.isFinalState(TxState.PENDING));
         assertFalse(TxState.isFinalState(TxState.FINISHING));
         assertFalse(TxState.isFinalState(TxState.ABANDONED));
+        assertFalse(TxState.isFinalState(TxState.UNKNOWN));
 
         // Final.
         assertTrue(TxState.isFinalState(TxState.ABORTED));
@@ -59,6 +68,9 @@ public class TxStateTest {
 
     @Test
     void testTransitionsFromNull() {
+        // Not allowed.
+        assertFalse(TxState.checkTransitionCorrectness(null, TxState.UNKNOWN));
+
         // Allowed.
         assertTrue(TxState.checkTransitionCorrectness(null, TxState.PENDING));
         assertTrue(TxState.checkTransitionCorrectness(null, TxState.ABORTED));
@@ -69,6 +81,9 @@ public class TxStateTest {
 
     @Test
     void testTransitionsFromPending() {
+        // Not allowed.
+        assertFalse(TxState.checkTransitionCorrectness(TxState.PENDING, 
TxState.UNKNOWN));
+
         // Allowed.
         assertTrue(TxState.checkTransitionCorrectness(TxState.PENDING, 
TxState.PENDING));
         assertTrue(TxState.checkTransitionCorrectness(TxState.PENDING, 
TxState.FINISHING));
@@ -82,6 +97,7 @@ public class TxStateTest {
         // Not allowed.
         assertFalse(TxState.checkTransitionCorrectness(TxState.FINISHING, 
TxState.PENDING));
         assertFalse(TxState.checkTransitionCorrectness(TxState.FINISHING, 
TxState.FINISHING));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.FINISHING, 
TxState.UNKNOWN));
 
         // Allowed.
         assertTrue(TxState.checkTransitionCorrectness(TxState.FINISHING, 
TxState.ABORTED));
@@ -96,6 +112,7 @@ public class TxStateTest {
         assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED, 
TxState.FINISHING));
         assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED, 
TxState.COMMITTED));
         assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED, 
TxState.ABANDONED));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.ABORTED, 
TxState.UNKNOWN));
 
         // Allowed.
         assertTrue(TxState.checkTransitionCorrectness(TxState.ABORTED, 
TxState.ABORTED));
@@ -108,6 +125,7 @@ public class TxStateTest {
         assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITTED, 
TxState.FINISHING));
         assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITTED, 
TxState.ABORTED));
         assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITTED, 
TxState.ABANDONED));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.COMMITTED, 
TxState.UNKNOWN));
 
         // Allowed.
         assertTrue(TxState.checkTransitionCorrectness(TxState.COMMITTED, 
TxState.COMMITTED));
@@ -120,6 +138,7 @@ public class TxStateTest {
     void testTransitionsFromAbandoned() {
         // Not allowed.
         assertFalse(TxState.checkTransitionCorrectness(TxState.ABANDONED, 
TxState.PENDING));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.ABANDONED, 
TxState.UNKNOWN));
 
         // Allowed.
         assertTrue(TxState.checkTransitionCorrectness(TxState.ABANDONED, 
TxState.FINISHING));
@@ -128,13 +147,28 @@ public class TxStateTest {
         assertTrue(TxState.checkTransitionCorrectness(TxState.ABANDONED, 
TxState.ABANDONED));
     }
 
+    /**
+     * Transition from UNKNOWN to any state is not allowed.
+     */
+    @Test
+    void testTransitionsFromUnknown() {
+        // Not allowed.
+        assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN, 
TxState.PENDING));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN, 
TxState.FINISHING));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN, 
TxState.ABORTED));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN, 
TxState.COMMITTED));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN, 
TxState.ABANDONED));
+        assertFalse(TxState.checkTransitionCorrectness(TxState.UNKNOWN, 
TxState.UNKNOWN));
+    }
+
     private static Stream<Arguments> txStateIds() {
         return Stream.of(
                 arguments(TxState.PENDING, 0),
                 arguments(TxState.FINISHING, 1),
                 arguments(TxState.ABORTED, 2),
                 arguments(TxState.COMMITTED, 3),
-                arguments(TxState.ABANDONED, 4)
+                arguments(TxState.ABANDONED, 4),
+                arguments(TxState.UNKNOWN, 5)
         );
     }
 
@@ -149,12 +183,11 @@ public class TxStateTest {
     @MethodSource("txStateIds")
     void testFromId(TxState expectedEnumEntry, int id) {
         assertEquals(expectedEnumEntry, TxState.fromId(id));
-
     }
 
     @Test
     void testFromIdThrows() {
         assertThrows(IllegalArgumentException.class, () -> TxState.fromId(-1));
-        assertThrows(IllegalArgumentException.class, () -> TxState.fromId(5));
+        assertThrows(IllegalArgumentException.class, () -> TxState.fromId(6));
     }
 }

Reply via email to