This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fb3c5e2033 IGNITE-15927 One-phase commit - Fixes #2329.
fb3c5e2033 is described below
commit fb3c5e2033629f9b5b92e23d9bb999b08d6c3c7b
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Mon Aug 7 18:31:46 2023 +0300
IGNITE-15927 One-phase commit - Fixes #2329.
Signed-off-by: Alexey Scherbakov <[email protected]>
---
.../ignite/internal/util/ExceptionUtils.java | 8 +-
...eFiveFunction.java => IgnitePentaFunction.java} | 2 +-
...eTetraFunction.java => IgniteQuadFunction.java} | 2 +-
.../ignite/internal/replicator/ReplicaManager.java | 25 ++-
.../ignite/internal/replicator/ReplicaService.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 1 +
.../ItAbstractInternalTableScanTest.java | 4 +-
.../table/distributed/StorageUpdateHandler.java | 40 +++--
.../distributed/command/PartitionCommand.java | 5 +
.../distributed/raft/PartitionDataStorage.java | 9 +
.../table/distributed/raft/PartitionListener.java | 27 ++-
.../SnapshotAwarePartitionDataStorage.java | 8 +
.../request/ReadWriteMultiRowReplicaRequest.java | 5 +
.../ReadWriteScanRetrieveBatchReplicaRequest.java | 4 +
.../request/ReadWriteSingleRowReplicaRequest.java | 5 +
.../request/ReadWriteSwapRowReplicaRequest.java | 5 +
.../replicator/PartitionReplicaListener.java | 183 ++++++++++++-------
.../distributed/storage/InternalTableImpl.java | 195 ++++++++++++---------
.../apache/ignite/internal/table/TxLocalTest.java | 56 ++++--
.../internal/table/distributed/IndexBaseTest.java | 7 +-
.../distributed/StorageUpdateHandlerTest.java | 2 +
.../distributed/TestPartitionDataStorage.java | 5 +
.../ignite/internal/table/TxAbstractTest.java | 31 +++-
.../table/impl/DummyInternalTableImpl.java | 79 ++++-----
.../ignite/internal/tx/InternalTransaction.java | 1 -
.../internal/tx/impl/ReadWriteTransactionImpl.java | 29 +--
.../apache/ignite/internal/tx/TxManagerTest.java | 5 +-
27 files changed, 471 insertions(+), 274 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index 30dec96249..0f66d776cf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -43,7 +43,7 @@ import org.apache.ignite.lang.IgniteCheckedException;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteTetraFunction;
+import org.apache.ignite.lang.IgniteQuadFunction;
import org.apache.ignite.lang.IgniteTriFunction;
import org.apache.ignite.lang.TraceableException;
import org.jetbrains.annotations.Nullable;
@@ -392,7 +392,7 @@ public final class ExceptionUtils {
* @return New exception with the given cause.
*/
public static <T extends Exception> T withCause(
- IgniteTetraFunction<UUID, Integer, String, Throwable, T> supplier,
+ IgniteQuadFunction<UUID, Integer, String, Throwable, T> supplier,
int defaultCode,
String message,
Throwable t
@@ -432,7 +432,7 @@ public final class ExceptionUtils {
* @return New exception with the given cause.
*/
public static <T extends Exception> T withCauseAndCode(
- IgniteTetraFunction<UUID, Integer, String, Throwable, T> supplier,
+ IgniteQuadFunction<UUID, Integer, String, Throwable, T> supplier,
int code,
String message,
Throwable t
@@ -450,7 +450,7 @@ public final class ExceptionUtils {
* @return New
*/
private static <T extends Exception> T withCauseInternal(
- IgniteTetraFunction<UUID, Integer, String, Throwable, T> supplier,
+ IgniteQuadFunction<UUID, Integer, String, Throwable, T> supplier,
int defaultCode,
Throwable t
) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java
b/modules/core/src/main/java/org/apache/ignite/lang/IgnitePentaFunction.java
similarity index 97%
rename from
modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java
rename to
modules/core/src/main/java/org/apache/ignite/lang/IgnitePentaFunction.java
index 2372f91e26..fa491b0fac 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgnitePentaFunction.java
@@ -36,7 +36,7 @@ import java.util.function.Function;
* @see Function
*/
@FunctionalInterface
-public interface IgniteFiveFunction<T, U, V, M, N, R> {
+public interface IgnitePentaFunction<T, U, V, M, N, R> {
/**
* Applies this function to the given arguments.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteTetraFunction.java
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteQuadFunction.java
old mode 100755
new mode 100644
similarity index 97%
rename from
modules/core/src/main/java/org/apache/ignite/lang/IgniteTetraFunction.java
rename to
modules/core/src/main/java/org/apache/ignite/lang/IgniteQuadFunction.java
index ecbece826b..ed6dc6d39f
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteTetraFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteQuadFunction.java
@@ -35,7 +35,7 @@ import java.util.function.Function;
* @see Function
*/
@FunctionalInterface
-public interface IgniteTetraFunction<T, U, V, M, R> {
+public interface IgniteQuadFunction<T, U, V, M, R> {
/**
* Applies this function to the given arguments.
*
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index a6db95fbb9..37b1950182 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -226,20 +226,27 @@ public class ReplicaManager implements IgniteComponent {
return;
}
+ HybridTimestamp sendTimestamp = null;
+
+ if (requestTimestamp != null) {
+ sendTimestamp = clock.update(requestTimestamp);
+ }
+
// replicaFut is always completed here.
Replica replica = replicaFut.join();
CompletableFuture<?> result = replica.processRequest(request);
+ HybridTimestamp finalSendTimestamp = sendTimestamp;
result.handle((res, ex) -> {
NetworkMessage msg;
if (ex == null) {
- msg = prepareReplicaResponse(requestTimestamp, res);
+ msg = prepareReplicaResponse(finalSendTimestamp, res);
} else {
LOG.warn("Failed to process replica request [request={}]",
ex, request);
- msg = prepareReplicaErrorResponse(requestTimestamp, ex);
+ msg = prepareReplicaErrorResponse(finalSendTimestamp, ex);
}
clusterNetSvc.messagingService().respond(senderConsistentId,
msg, correlationId);
@@ -361,7 +368,7 @@ public class ReplicaManager implements IgniteComponent {
/**
* Internal method for starting a replica.
*
- * @param replicaGrpId Replication group id.
+ * @param replicaGrpId Replication group id.
* @param whenReplicaReady Future that completes when the replica become
ready.
* @param listener Replica listener.
* @param raftClient Topology aware Raft client.
@@ -558,12 +565,12 @@ public class ReplicaManager implements IgniteComponent {
/**
* Prepares replica response.
*/
- private NetworkMessage prepareReplicaResponse(HybridTimestamp
requestTimestamp, Object result) {
- if (requestTimestamp != null) {
+ private NetworkMessage prepareReplicaResponse(@Nullable HybridTimestamp
sendTimestamp, Object result) {
+ if (sendTimestamp != null) {
return REPLICA_MESSAGES_FACTORY
.timestampAwareReplicaResponse()
.result(result)
- .timestampLong(clock.update(requestTimestamp).longValue())
+ .timestampLong(sendTimestamp.longValue())
.build();
} else {
return REPLICA_MESSAGES_FACTORY
@@ -576,12 +583,12 @@ public class ReplicaManager implements IgniteComponent {
/**
* Prepares replica error response.
*/
- private NetworkMessage prepareReplicaErrorResponse(HybridTimestamp
requestTimestamp, Throwable ex) {
- if (requestTimestamp != null) {
+ private NetworkMessage prepareReplicaErrorResponse(@Nullable
HybridTimestamp sendTimestamp, Throwable ex) {
+ if (sendTimestamp != null) {
return REPLICA_MESSAGES_FACTORY
.errorTimestampAwareReplicaResponse()
.throwable(ex)
- .timestampLong(clock.update(requestTimestamp).longValue())
+ .timestampLong(sendTimestamp.longValue())
.build();
} else {
return REPLICA_MESSAGES_FACTORY
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 2360a20a8b..712122fb4c 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -48,7 +48,7 @@ import org.apache.ignite.network.NetworkMessage;
*/
public class ReplicaService {
/** Network timeout. */
- private static final int RPC_TIMEOUT = 3000;
+ private static final long RPC_TIMEOUT = 3000;
/** Message service. */
private final MessagingService messagingService;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index c7b6996dd8..ef6b508582 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -951,6 +951,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* checks that the table created before node stop, is not available when
majority if lost.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20137")
public void testOneNodeRestartWithGap() throws InterruptedException {
IgniteImpl ignite = startNode(0);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index dabd1d3d2d..69f4accb17 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -268,7 +268,7 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
}
});
- gotExceptionLatch.await();
+ assertTrue(gotExceptionLatch.await(10_000, TimeUnit.MILLISECONDS));
assertEquals(gotException.get().getCause().getClass(),
StorageException.class);
@@ -352,7 +352,7 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
}
});
- gotExceptionLatch.await();
+ assertTrue(gotExceptionLatch.await(10_000, TimeUnit.MILLISECONDS));
assertEquals(gotException.get().getClass(),
IllegalStateException.class);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 146ff6620a..0d9ff37d99 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -103,13 +103,15 @@ public class StorageUpdateHandler {
* @param commitPartitionId Commit partition id.
* @param row Row.
* @param onApplication Callback on application.
+ * @param commitTs Commit timestamp to use on autocommit.
*/
public void handleUpdate(
UUID txId,
UUID rowUuid,
TablePartitionId commitPartitionId,
@Nullable BinaryRow row,
- @Nullable Consumer<RowId> onApplication
+ @Nullable Consumer<RowId> onApplication,
+ @Nullable HybridTimestamp commitTs
) {
indexUpdateHandler.waitIndexes();
@@ -120,11 +122,16 @@ public class StorageUpdateHandler {
locker.lock(rowId);
- BinaryRow oldRow = storage.addWrite(rowId, row, txId, commitTblId,
commitPartId);
+ if (commitTs != null) {
+ storage.addWriteCommitted(rowId, row, commitTs);
+ } else {
+ BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
- if (oldRow != null) {
- // Previous uncommitted row should be removed from indexes.
- tryRemovePreviousWritesIndex(rowId, oldRow);
+ if (oldRow != null) {
+ assert commitTs == null : String.format("Expecting
explicit txn: [txId=%s]", txId);
+ // Previous uncommitted row should be removed from indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
}
indexUpdateHandler.addToIndexes(row, rowId);
@@ -145,13 +152,15 @@ public class StorageUpdateHandler {
* @param txId Transaction id.
* @param rowsToUpdate Collection of rows to update.
* @param commitPartitionId Commit partition id.
- * @param onReplication On replication callback.
+ * @param onApplication Callback on application.
+ * @param commitTs Commit timestamp to use on autocommit.
*/
public void handleUpdateAll(
UUID txId,
Map<UUID, BinaryRowMessage> rowsToUpdate,
TablePartitionId commitPartitionId,
- @Nullable Consumer<Collection<RowId>> onReplication
+ @Nullable Consumer<Collection<RowId>> onApplication,
+ @Nullable HybridTimestamp commitTs
) {
indexUpdateHandler.waitIndexes();
@@ -171,19 +180,24 @@ public class StorageUpdateHandler {
locker.lock(rowId);
- BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+ if (commitTs != null) {
+ storage.addWriteCommitted(rowId, row, commitTs);
+ } else {
+ BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
- if (oldRow != null) {
- // Previous uncommitted row should be removed from
indexes.
- tryRemovePreviousWritesIndex(rowId, oldRow);
+ if (oldRow != null) {
+ assert commitTs == null : String.format("Expecting
explicit txn: [txId=%s]", txId);
+ // Previous uncommitted row should be removed from
indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
}
rowIds.add(rowId);
indexUpdateHandler.addToIndexes(row, rowId);
}
- if (onReplication != null) {
- onReplication.accept(rowIds);
+ if (onApplication != null) {
+ onApplication.accept(rowIds);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
index d28ed66ea4..36b776256f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
@@ -28,4 +28,9 @@ public interface PartitionCommand extends
SafeTimePropagatingCommand {
* Returns a transaction id.
*/
UUID txId();
+
+ /**
+ * Returns {@code true} if a command represents a full (including all
keys) transaction.
+ */
+ boolean full();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index 8d9f3426d5..945e414282 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -130,6 +130,15 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
@Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row, UUID
txId, int commitTableId, int commitPartitionId)
throws TxIdMismatchException, StorageException;
+ /**
+ * Write and commit the row in one step.
+ *
+ * @param rowId Row id.
+ * @param row Row (null to remove existing)
+ * @param commitTs Commit timestamp.
+ */
+ void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTs);
+
/**
* Aborts a pending update of the ongoing uncommitted transaction. Invoked
during rollback.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 41c1ee0d3c..d2cdede161 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -233,10 +233,14 @@ public class PartitionListener implements
RaftGroupListener {
storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
rowId -> {
- txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new
TreeSet<>()).add(rowId);
+ // Cleanup is not required for one-phase transactions.
+ if (!cmd.full()) {
+ txsPendingRowIds.computeIfAbsent(cmd.txId(), entry ->
new TreeSet<>()).add(rowId);
+ }
storage.lastApplied(commandIndex, commandTerm);
- }
+ },
+ cmd.full() ? cmd.safeTime() : null
);
}
@@ -253,13 +257,18 @@ public class PartitionListener implements
RaftGroupListener {
return;
}
- storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(),
cmd.tablePartitionId().asTablePartitionId(), rowIds -> {
- for (RowId rowId : rowIds) {
- txsPendingRowIds.computeIfAbsent(cmd.txId(), entry0 -> new
TreeSet<>()).add(rowId);
- }
+ storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(),
cmd.tablePartitionId().asTablePartitionId(),
+ rowIds -> {
+ // Cleanup is not required for one-phase transactions.
+ if (!cmd.full()) {
+ for (RowId rowId : rowIds) {
+ txsPendingRowIds.computeIfAbsent(cmd.txId(),
entry0 -> new TreeSet<>()).add(rowId);
+ }
+ }
- storage.lastApplied(commandIndex, commandTerm);
- });
+ storage.lastApplied(commandIndex, commandTerm);
+ },
+ cmd.full() ? cmd.safeTime() : null);
}
/**
@@ -365,7 +374,7 @@ public class PartitionListener implements RaftGroupListener
{
*
* @param cmd Command.
* @param commandIndex RAFT index of the command.
- * @param commandTerm RAFT term of the command.
+ * @param commandTerm RAFT term of the command.
*/
private void handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd, long
commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 09144845e0..f1dbcc4404 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -117,6 +117,14 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
return partitionStorage.addWrite(rowId, row, txId, commitTableId,
commitPartitionId);
}
+ @Override
+ public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTs)
+ throws TxIdMismatchException, StorageException {
+ handleSnapshotInterference(rowId);
+
+ partitionStorage.addWriteCommitted(rowId, row, commitTs);
+ }
+
@Override
public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException
{
handleSnapshotInterference(rowId);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
index f2330dbdc3..8888ee65b6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
@@ -34,4 +34,9 @@ public interface ReadWriteMultiRowReplicaRequest extends
MultipleRowReplicaReque
*/
@Marshallable
TablePartitionId commitPartitionId();
+
+ /**
+ * Return {@code true} if this is a full transaction.
+ */
+ boolean full();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
index 39c29b1e48..6de6825362 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
@@ -25,4 +25,8 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
public interface ReadWriteScanRetrieveBatchReplicaRequest extends
ScanRetrieveBatchReplicaRequest, ReadWriteReplicaRequest {
+ /**
+ * Return {@code true} if this is a full transaction.
+ */
+ boolean full();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
index bd8b1344fd..b9e5edf122 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
@@ -34,4 +34,9 @@ public interface ReadWriteSingleRowReplicaRequest extends
SingleRowReplicaReques
*/
@Marshallable
TablePartitionId commitPartitionId();
+
+ /**
+ * Return {@code true} if this is a full transaction.
+ */
+ boolean full();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
index 1018ad2a99..1f3f325f95 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
@@ -34,4 +34,9 @@ public interface ReadWriteSwapRowReplicaRequest extends
SwapRowReplicaRequest, R
*/
@Marshallable
TablePartitionId commitPartitionId();
+
+ /**
+ * Return {@code true} if this is a full transaction.
+ */
+ boolean full();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 1979ef7730..5533971e1f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -147,6 +147,7 @@ import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteExceptionUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.ClusterNode;
@@ -334,19 +335,31 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), ()
-> processSingleEntryAction(req));
+ return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processSingleEntryAction(req));
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
var req = (ReadWriteMultiRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), ()
-> processMultiEntryAction(req));
+ return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processMultiEntryAction(req));
} else if (request instanceof ReadWriteSwapRowReplicaRequest) {
var req = (ReadWriteSwapRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), ()
-> processTwoEntriesAction(req));
+ return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processTwoEntriesAction(req));
} else if (request instanceof
ReadWriteScanRetrieveBatchReplicaRequest) {
var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
- return appendTxCommand(req.transactionId(), RequestType.RW_SCAN,
() -> processScanRetrieveBatchAction(req));
+ // Implicit RW scan can be committed locally on a last batch or
error.
+ return appendTxCommand(req.transactionId(), RequestType.RW_SCAN,
false, () -> processScanRetrieveBatchAction(req)).handle(
+ (rows, err) -> {
+ if (req.full() && (err != null || rows.size() <
req.batchSize())) {
+ releaseTxLocks(req.transactionId());
+ }
+
+ if (err != null) {
+ IgniteExceptionUtils.sneakyThrow(err);
+ }
+
+ return rows;
+ });
} else if (request instanceof ReadWriteScanCloseReplicaRequest) {
processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
@@ -1338,29 +1351,37 @@ public class PartitionReplicaListener implements
ReplicaListener {
*
* @param txId Transaction id.
* @param cmdType Command type.
+ * @param full {@code True} if a full transaction and can be immediately
committed.
* @param op Operation closure.
* @param <T> Type of execution result.
* @return A future object representing the result of the given operation.
*/
- private <T> CompletableFuture<T> appendTxCommand(UUID txId, RequestType
cmdType, Supplier<CompletableFuture<T>> op) {
+ private <T> CompletableFuture<T> appendTxCommand(UUID txId, RequestType
cmdType, boolean full, Supplier<CompletableFuture<T>> op) {
var fut = new CompletableFuture<T>();
- txCleanupReadyFutures.compute(txId, (id, txOps) -> {
- if (txOps == null) {
- txOps = new TxCleanupReadyFutureList();
- }
+ if (!full) {
+ txCleanupReadyFutures.compute(txId, (id, txOps) -> {
+ if (txOps == null) {
+ txOps = new TxCleanupReadyFutureList();
+ }
- if (txOps.state == TxState.ABORTED || txOps.state ==
TxState.COMMITED) {
- fut.completeExceptionally(new
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is
already finished."));
- } else {
- txOps.futures.computeIfAbsent(cmdType, type -> new
ArrayList<>()).add(fut);
- }
+ if (txOps.state == TxState.ABORTED || txOps.state ==
TxState.COMMITED) {
+ fut.completeExceptionally(
+ new
TransactionException(TX_FAILED_READ_WRITE_OPERATION_ERR, "Transaction is
already finished."));
+ } else {
+ txOps.futures.computeIfAbsent(cmdType, type -> new
ArrayList<>()).add(fut);
+ }
- return txOps;
- });
+ return txOps;
+ });
+ }
if (!fut.isDone()) {
op.get().whenComplete((v, th) -> {
+ if (full) { // Fast unlock.
+ releaseTxLocks(txId);
+ }
+
if (th != null) {
fut.completeExceptionally(th);
} else {
@@ -1487,6 +1508,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
private CompletableFuture<Object>
processMultiEntryAction(ReadWriteMultiRowReplicaRequest request) {
UUID txId = request.transactionId();
TablePartitionId committedPartitionId = request.commitPartitionId();
+ boolean full = request.full();
assert committedPartitionId != null || request.requestType() ==
RequestType.RW_GET_ALL
: "Commit partition is null [type=" + request.requestType() +
']';
@@ -1554,7 +1576,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(result);
}
- return
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete,
txId))
+ return
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete,
txId, full))
.thenApply(ignored -> result);
});
}
@@ -1590,7 +1612,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
CompletableFuture<Object> raftFut =
rowIdsToDelete.isEmpty() ? completedFuture(null)
- :
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete,
txId));
+ :
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete,
txId, full));
return raftFut.thenApply(ignored -> result);
});
@@ -1655,7 +1677,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return allOf(insertLockFuts)
.thenCompose(ignored -> applyUpdateAllCommand(
- updateAllCommand(committedPartitionId,
convertedMap, txId)))
+ updateAllCommand(committedPartitionId,
convertedMap, txId, full)))
.thenApply(ignored -> {
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId,
Collection<Lock>>> insertLockFut : insertLockFuts) {
@@ -1699,7 +1721,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(null);
}
- return
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowsToUpdate,
txId))
+ return
applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowsToUpdate,
txId, full))
.thenApply(ignored -> {
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId,
Collection<Lock>>> rowIdFut : rowIdFuts) {
@@ -1749,23 +1771,38 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
*/
private CompletableFuture<Object> applyUpdateCommand(UpdateCommand cmd) {
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
- cmd.tablePartitionId().asTablePartitionId(),
- cmd.row(),
- rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
- if (v == null) {
- v = new TreeSet<>();
- }
+ if (!cmd.full()) {
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.row(),
+ rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
+ if (v == null) {
+ v = new TreeSet<>();
+ }
- v.add(rowId);
+ v.add(rowId);
- return v;
- })
- );
+ return v;
+ }),
+ null);
+ }
- return applyCmdWithExceptionHandling(cmd);
+ return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+ // Try to avoid double write if an entry is already replicated.
+ if (cmd.full() && cmd.safeTime().compareTo(safeTime.current()) >
0) {
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.row(),
+ null,
+ cmd.safeTime());
+ }
+
+ return res;
+ });
}
/**
@@ -1775,21 +1812,35 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Raft future, see {@link
#applyCmdWithExceptionHandling(Command)}.
*/
private CompletableFuture<Object> applyUpdateAllCommand(UpdateAllCommand
cmd) {
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
- cmd.tablePartitionId().asTablePartitionId(),
- rowIds -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
- if (v == null) {
- v = new TreeSet<>();
- }
+ if (!cmd.full()) {
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ rowIds -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
+ if (v == null) {
+ v = new TreeSet<>();
+ }
- v.addAll(rowIds);
+ v.addAll(rowIds);
- return v;
- }));
+ return v;
+ }),
+ null);
+ }
+
+ return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
+ if (cmd.full() && cmd.safeTime().compareTo(safeTime.current()) >
0) {
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ null,
+ cmd.safeTime());
+ }
- return applyCmdWithExceptionHandling(cmd);
+ return res;
+ });
}
/**
@@ -1802,6 +1853,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
UUID txId = request.transactionId();
BinaryRow searchRow = request.binaryRow();
TablePartitionId commitPartitionId = request.commitPartitionId();
+ boolean full = request.full();
assert commitPartitionId != null || request.requestType() ==
RequestType.RW_GET :
"Commit partition is null [type=" + request.requestType() +
']';
@@ -1825,7 +1877,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForDelete(row, rowId, txId)
.thenCompose(ignored -> applyUpdateCommand(
- updateCommand(commitPartitionId,
rowId.uuid(), null, txId)))
+ updateCommand(commitPartitionId,
rowId.uuid(), null, txId, full)))
.thenApply(ignored -> true);
});
}
@@ -1837,7 +1889,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForDelete(row, rowId, txId)
.thenCompose(ignored -> applyUpdateCommand(
- updateCommand(commitPartitionId,
rowId.uuid(), null, txId)))
+ updateCommand(commitPartitionId,
rowId.uuid(), null, txId, full)))
.thenApply(ignored -> row);
});
}
@@ -1854,7 +1906,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return applyUpdateCommand(
- updateCommand(commitPartitionId,
validatedRowId.uuid(), null, txId))
+ updateCommand(commitPartitionId,
validatedRowId.uuid(), null, txId, full))
.thenApply(ignored -> true);
});
});
@@ -1869,7 +1921,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForInsert(searchRow, rowId0, txId)
.thenCompose(rowIdLock -> applyUpdateCommand(
- updateCommand(commitPartitionId,
rowId0.uuid(), searchRow, txId))
+ updateCommand(commitPartitionId,
rowId0.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
// Release short term locks.
@@ -1891,7 +1943,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return lockFut
.thenCompose(rowIdLock -> applyUpdateCommand(
- updateCommand(commitPartitionId,
rowId0.uuid(), searchRow, txId))
+ updateCommand(commitPartitionId,
rowId0.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
// Release short term locks.
@@ -1913,7 +1965,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return lockFut
.thenCompose(rowIdLock -> applyUpdateCommand(
- updateCommand(commitPartitionId,
rowId0.uuid(), searchRow, txId))
+ updateCommand(commitPartitionId,
rowId0.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
// Release short term locks.
@@ -1931,7 +1983,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForUpdate(searchRow, rowId, txId)
.thenCompose(rowIdLock -> applyUpdateCommand(
- updateCommand(commitPartitionId,
rowId.uuid(), searchRow, txId))
+ updateCommand(commitPartitionId,
rowId.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
// Release short term locks.
@@ -1949,7 +2001,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return takeLocksForUpdate(searchRow, rowId, txId)
.thenCompose(rowLock -> applyUpdateCommand(
- updateCommand(commitPartitionId,
rowId.uuid(), searchRow, txId))
+ updateCommand(commitPartitionId,
rowId.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowLock))
.thenApply(rowIdLock -> {
// Release short term locks.
@@ -2111,7 +2163,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return applyUpdateCommand(
- updateCommand(commitPartitionId,
validatedRowId.get1().uuid(), newRow, txId))
+ updateCommand(commitPartitionId,
validatedRowId.get1().uuid(), newRow, txId,
+ request.full()))
.thenApply(ignored -> validatedRowId)
.thenApply(rowIdLock -> {
// Release short term locks.
@@ -2348,17 +2401,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
- /**
- * Compounds a RAFT group unique name.
- *
- * @param tblId Table identifier.
- * @param partition Number of table partitions.
- * @return A RAFT group name.
- */
- private String partitionRaftGroupName(UUID tblId, int partition) {
- return tblId + "_part_" + partition;
- }
-
/**
* Method to construct {@link UpdateCommand} object.
*
@@ -2366,13 +2408,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param rowUuid Row UUID.
* @param row Row.
* @param txId Transaction ID.
+ * @param full {@code True} if this is a full transaction.
* @return Constructed {@link UpdateCommand} object.
*/
- private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID
rowUuid, @Nullable BinaryRow row, UUID txId) {
+ private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID
rowUuid, @Nullable BinaryRow row, UUID txId, boolean full) {
UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
.tablePartitionId(tablePartitionId(tablePartId))
.rowUuid(rowUuid)
.txId(txId)
+ .full(full)
.safeTimeLong(hybridClock.nowLong());
if (row != null) {
@@ -2393,14 +2437,17 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param tablePartId {@link TablePartitionId} object to construct {@link
UpdateCommand} object with.
* @param rowsToUpdate All {@link BinaryRow}s represented as {@link
ByteBuffer}s to be updated.
* @param txId Transaction ID.
+ * @param full {@code True} if full transaction.
* @return Constructed {@link UpdateAllCommand} object.
*/
- private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId,
Map<UUID, BinaryRowMessage> rowsToUpdate, UUID txId) {
+ private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId,
Map<UUID, BinaryRowMessage> rowsToUpdate, UUID txId,
+ boolean full) {
return MSG_FACTORY.updateAllCommand()
.tablePartitionId(tablePartitionId(tablePartId))
.rowsToUpdate(rowsToUpdate)
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
+ .full(full)
.build();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ce3b671430..792a5202a8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,6 +21,9 @@ import static
it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
@@ -90,10 +93,10 @@ import
org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteFiveFunction;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgnitePentaFunction;
import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteTetraFunction;
+import org.apache.ignite.lang.IgniteTriFunction;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -217,14 +220,14 @@ public class InternalTableImpl implements InternalTable {
* Enlists a single row into a transaction.
*
* @param row The row.
- * @param tx The transaction.
- * @param op Replica requests factory.
+ * @param tx The transaction, not null if explicit.
+ * @param fac Replica requests factory.
* @return The future.
*/
private <R> CompletableFuture<R> enlistInTx(
BinaryRowEx row,
@Nullable InternalTransaction tx,
- IgniteTetraFunction<TablePartitionId, InternalTransaction,
ReplicationGroupId, Long, ReplicaRequest> op
+ IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long,
ReplicaRequest> fac
) {
// Check whether proposed tx is read-only. Complete future
exceptionally if true.
// Attempting to enlist a read-only in a read-write transaction does
not corrupt the transaction itself, thus read-write transaction
@@ -251,9 +254,9 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<R> fut;
if (primaryReplicaAndTerm != null) {
- TablePartitionId commitPart = tx.commitPartition();
+ assert !implicit;
- ReplicaRequest request = op.apply(commitPart, tx0, partGroupId,
primaryReplicaAndTerm.get2());
+ ReplicaRequest request = fac.apply(tx, partGroupId,
primaryReplicaAndTerm.get2());
try {
fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
@@ -263,15 +266,10 @@ public class InternalTableImpl implements InternalTable {
throw new TransactionException("Failed to invoke the replica
request.");
}
} else {
- fut = enlistWithRetry(
- tx0,
- partId,
- (commitPart, term) -> op.apply(commitPart, tx0,
partGroupId, term),
- ATTEMPTS_TO_ENLIST_PARTITION
- );
+ fut = enlistWithRetry(tx0, partId, term -> fac.apply(tx0,
partGroupId, term), ATTEMPTS_TO_ENLIST_PARTITION);
}
- return postEnlist(fut, implicit, tx0);
+ return postEnlist(fut, false, tx0, implicit);
}
/**
@@ -279,14 +277,14 @@ public class InternalTableImpl implements InternalTable {
*
* @param keyRows Rows.
* @param tx The transaction.
- * @param op Replica requests factory.
+ * @param fac Replica requests factory.
* @param reducer Transform reducer.
* @return The future.
*/
private <T> CompletableFuture<T> enlistInTx(
Collection<BinaryRowEx> keyRows,
@Nullable InternalTransaction tx,
- IgniteFiveFunction<TablePartitionId, Collection<BinaryRow>,
InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> op,
+ IgnitePentaFunction<Collection<BinaryRow>, InternalTransaction,
ReplicationGroupId, Long, Boolean, ReplicaRequest> fac,
Function<Collection<RowBatch>, CompletableFuture<T>> reducer
) {
// Check whether proposed tx is read-only. Complete future
exceptionally if true.
@@ -313,6 +311,8 @@ public class InternalTableImpl implements InternalTable {
Int2ObjectMap<RowBatch> rowBatchByPartitionId =
toRowBatchByPartitionId(keyRows);
+ boolean singlePart = rowBatchByPartitionId.size() == 1;
+
for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch :
rowBatchByPartitionId.int2ObjectEntrySet()) {
int partitionId = partitionRowBatch.getIntKey();
RowBatch rowBatch = partitionRowBatch.getValue();
@@ -324,9 +324,8 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<Object> fut;
if (primaryReplicaAndTerm != null) {
- TablePartitionId commitPart = tx.commitPartition();
-
- ReplicaRequest request = op.apply(commitPart,
rowBatch.requestedRows, tx0, partGroupId, primaryReplicaAndTerm.get2());
+ assert !implicit;
+ ReplicaRequest request = fac.apply(rowBatch.requestedRows,
tx0, partGroupId, primaryReplicaAndTerm.get2(), false);
try {
fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(),
request);
@@ -339,7 +338,7 @@ public class InternalTableImpl implements InternalTable {
fut = enlistWithRetry(
tx0,
partitionId,
- (commitPart, term) -> op.apply(commitPart,
rowBatch.requestedRows, tx0, partGroupId, term),
+ term -> fac.apply(rowBatch.requestedRows, tx0,
partGroupId, term, implicit && singlePart),
ATTEMPTS_TO_ENLIST_PARTITION
);
}
@@ -349,7 +348,7 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<T> fut =
reducer.apply(rowBatchByPartitionId.values());
- return postEnlist(fut, implicit, tx0);
+ return postEnlist(fut, implicit && !singlePart, tx0, implicit &&
singlePart);
}
/**
@@ -364,6 +363,7 @@ public class InternalTableImpl implements InternalTable {
* @param upperBound Upper search bound.
* @param flags Control flags. See {@link
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
* @param columnsToInclude Row projection.
+ * @param implicit {@code True} if the implicit txn.
* @return Batch of retrieved rows.
*/
private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
@@ -376,7 +376,8 @@ public class InternalTableImpl implements InternalTable {
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags,
- @Nullable BitSet columnsToInclude
+ @Nullable BitSet columnsToInclude,
+ boolean implicit
) {
TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
@@ -395,6 +396,7 @@ public class InternalTableImpl implements InternalTable {
.upperBoundPrefix(binaryTupleMessage(upperBound))
.flags(flags)
.columnsToInclude(columnsToInclude)
+ .full(implicit) // Intent for one phase commit.
.batchSize(batchSize);
if (primaryReplicaAndTerm != null) {
@@ -408,10 +410,10 @@ public class InternalTableImpl implements InternalTable {
throw new TransactionException("Failed to invoke the replica
request.");
}
} else {
- fut = enlistWithRetry(tx, partId, (commitPart, term) ->
requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION);
+ fut = enlistWithRetry(tx, partId, term ->
requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION);
}
- return postEnlist(fut, false, tx);
+ return postEnlist(fut, false, tx, false);
}
private @Nullable BinaryTupleMessage binaryTupleMessage(@Nullable
BinaryTupleReader binaryTuple) {
@@ -430,14 +432,14 @@ public class InternalTableImpl implements InternalTable {
*
* @param tx Internal transaction.
* @param partId Partition number.
- * @param requestFunction Function to create replica request with new raft
term.
+ * @param mapFunc Function to create replica request with new raft term.
* @param attempts Number of attempts.
* @return The future.
*/
private <R> CompletableFuture<R> enlistWithRetry(
InternalTransaction tx,
int partId,
- BiFunction<TablePartitionId, Long, ReplicaRequest> requestFunction,
+ Function<Long, ReplicaRequest> mapFunc,
int attempts
) {
CompletableFuture<R> result = new CompletableFuture<>();
@@ -447,7 +449,7 @@ public class InternalTableImpl implements InternalTable {
try {
return replicaSvc.invoke(
primaryReplicaAndTerm.get1(),
-
requestFunction.apply(tx.commitPartition(), primaryReplicaAndTerm.get2())
+
mapFunc.apply(primaryReplicaAndTerm.get2())
);
} catch (PrimaryReplicaMissException e) {
throw new TransactionException(e);
@@ -466,7 +468,7 @@ public class InternalTableImpl implements InternalTable {
.handle((res0, e) -> {
if (e != null) {
if (e.getCause() instanceof
PrimaryReplicaMissException && attempts > 0) {
- return enlistWithRetry(tx, partId,
requestFunction, attempts - 1).handle((r2, e2) -> {
+ return enlistWithRetry(tx, partId, mapFunc,
attempts - 1).handle((r2, e2) -> {
if (e2 != null) {
return result.completeExceptionally(e2);
} else {
@@ -488,13 +490,22 @@ public class InternalTableImpl implements InternalTable {
* Performs post enlist operation.
*
* @param fut The future.
- * @param implicit {@code true} for implicit tx.
+ * @param autoCommit {@code True} for auto commit.
* @param tx0 The transaction.
+ * @param full If this is full transaction.
* @param <T> Operation return type.
* @return The future.
*/
- private <T> CompletableFuture<T> postEnlist(CompletableFuture<T> fut,
boolean implicit, InternalTransaction tx0) {
+ private <T> CompletableFuture<T> postEnlist(CompletableFuture<T> fut,
boolean autoCommit, InternalTransaction tx0, boolean full) {
+ assert !(autoCommit && full) : "Invalid combination of flags";
+
return fut.handle((BiFunction<T, Throwable, CompletableFuture<T>>) (r,
e) -> {
+ if (full) { // Full txn is already finished remotely. Just update
local state.
+ // TODO: IGNITE-17638 TestOnly code, let's consider using Txn
state map instead of states.
+ txManager.changeState(tx0.id(), PENDING, e == null ? COMMITED
: ABORTED);
+ return e != null ? failedFuture(wrapReplicationException(e)) :
completedFuture(r);
+ }
+
if (e != null) {
RuntimeException e0 = wrapReplicationException(e);
@@ -508,7 +519,7 @@ public class InternalTableImpl implements InternalTable {
} else {
tx0.enlistResultFuture(fut);
- if (implicit) {
+ if (autoCommit) {
return tx0.commitAsync()
.exceptionally(ex -> {
throw wrapReplicationException(ex);
@@ -531,14 +542,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRow,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.binaryRowMessage(serializeBinaryRow(keyRow))
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -578,14 +590,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRows,
tx,
- (commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.binaryRowMessages(serializeBinaryRows(keyRows0))
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_ALL)
.timestampLong(clock.nowLong())
+ .full(full)
.build(),
InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder
);
@@ -640,14 +653,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_UPSERT)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build());
}
@@ -671,11 +685,11 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<Void> fut = enlistWithRetry(
tx,
partition,
- (commitPart, term) -> upsertAllInternal(commitPart, rows, tx,
partGroupId, term),
+ term -> upsertAllInternal(rows, tx, partGroupId, term, true),
ATTEMPTS_TO_ENLIST_PARTITION
);
- return postEnlist(fut, true, tx);
+ return postEnlist(fut, false, tx, true); // Will be committed in one
RTT.
}
/** {@inheritDoc} */
@@ -684,14 +698,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_UPSERT)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -702,14 +717,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_INSERT)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -720,14 +736,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessages(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_INSERT_ALL)
.timestampLong(clock.nowLong())
+ .full(full)
.build(),
InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
);
@@ -739,14 +756,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_REPLACE_IF_EXIST)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -757,15 +775,16 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
newRow,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSwapRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSwapRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.oldBinaryRowMessage(serializeBinaryRow(oldRow))
.binaryRowMessage(serializeBinaryRow(newRow))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_REPLACE)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -776,14 +795,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_REPLACE)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -794,14 +814,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRow,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessage(serializeBinaryRow(keyRow))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -812,14 +833,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
oldRow,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessage(serializeBinaryRow(oldRow))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_EXACT)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -830,14 +852,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (commitPart, txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_DELETE)
.timestampLong(clock.nowLong())
+ .full(tx == null)
.build()
);
}
@@ -848,14 +871,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessages(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_ALL)
.timestampLong(clock.nowLong())
+ .full(full)
.build(),
InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
);
@@ -870,14 +894,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (commitPart, keyRows0, txo, groupId, term) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessages(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_EXACT_ALL)
.timestampLong(clock.nowLong())
+ .full(full)
.build(),
InternalTableImpl::collectMultiRowsResponsesWithoutRestoreOrder
);
@@ -958,7 +983,7 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipientNode, request);
},
// TODO: IGNITE-17666 Close cursor tx finish.
- Function.identity());
+ (unused, fut) -> fut);
}
@Override
@@ -1011,9 +1036,10 @@ public class InternalTableImpl implements InternalTable {
lowerBound,
upperBound,
flags,
- columnsToInclude
+ columnsToInclude,
+ implicit
),
- fut -> postEnlist(fut, implicit, tx0)
+ (commit, fut) -> postEnlist(fut, commit, tx0, implicit &&
!commit)
);
}
@@ -1060,12 +1086,13 @@ public class InternalTableImpl implements InternalTable
{
.columnsToInclude(columnsToInclude)
.batchSize(batchSize)
.term(recipient.term())
+ .full(false) // Set explicitly.
.build();
return replicaSvc.invoke(recipient.node(), request);
},
// TODO: IGNITE-17666 Close cursor tx finish.
- Function.identity());
+ (unused, fut) -> fut);
}
/**
@@ -1348,7 +1375,7 @@ public class InternalTableImpl implements InternalTable {
private final BiFunction<Long, Integer,
CompletableFuture<Collection<BinaryRow>>> retrieveBatch;
/** The closure will be invoked before the cursor closed. */
- Function<CompletableFuture<Void>, CompletableFuture<Void>> onClose;
+ BiFunction<Boolean, CompletableFuture<Void>, CompletableFuture<Void>>
onClose;
/** True when the publisher has a subscriber, false otherwise. */
private final AtomicBoolean subscribed;
@@ -1362,7 +1389,7 @@ public class InternalTableImpl implements InternalTable {
*/
PartitionScanPublisher(
BiFunction<Long, Integer,
CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
- Function<CompletableFuture<Void>, CompletableFuture<Void>>
onClose
+ BiFunction<Boolean, CompletableFuture<Void>,
CompletableFuture<Void>> onClose
) {
this.retrieveBatch = retrieveBatch;
this.onClose = onClose;
@@ -1420,7 +1447,7 @@ public class InternalTableImpl implements InternalTable {
@Override
public void request(long n) {
if (n <= 0) {
- cancel();
+ cancel(null, true);
subscriber.onError(new
IllegalArgumentException(IgniteStringFormatter
.format("Invalid requested amount of items
[requested={}, minValue=1]", n))
@@ -1447,20 +1474,21 @@ public class InternalTableImpl implements InternalTable
{
/** {@inheritDoc} */
@Override
public void cancel() {
- cancel(null);
+ cancel(null, true); // Explicit cancel.
}
/**
* After the method is called, a subscriber won't be received
updates from the publisher.
*
* @param t An exception which was thrown when entries were
retrieving from the cursor.
+ * @param commit {@code True} to commit.
*/
- public void cancel(Throwable t) {
+ private void cancel(Throwable t, boolean commit) {
if (!canceled.compareAndSet(false, true)) {
return;
}
- onClose.apply(t == null ? completedFuture(null) :
failedFuture(t)).handle((ignore, th) -> {
+ onClose.apply(commit, t == null ? completedFuture(null) :
failedFuture(t)).handle((ignore, th) -> {
if (th != null) {
subscriber.onError(th);
} else {
@@ -1482,18 +1510,13 @@ public class InternalTableImpl implements InternalTable
{
}
retrieveBatch.apply(scanId, n).thenAccept(binaryRows -> {
- if (binaryRows == null) {
- cancel();
-
- return;
- } else {
- assert binaryRows.size() <= n : "Rows more then
requested " + binaryRows.size() + " " + n;
+ assert binaryRows != null;
+ assert binaryRows.size() <= n : "Rows more then requested
" + binaryRows.size() + " " + n;
- binaryRows.forEach(subscriber::onNext);
- }
+ binaryRows.forEach(subscriber::onNext);
if (binaryRows.size() < n) {
- cancel();
+ cancel(null, false);
} else {
long remaining =
requestedItemsCnt.addAndGet(Math.negateExact(binaryRows.size()));
@@ -1502,7 +1525,7 @@ public class InternalTableImpl implements InternalTable {
}
}
}).exceptionally(t -> {
- cancel(t);
+ cancel(t, false);
return null;
});
@@ -1620,19 +1643,23 @@ public class InternalTableImpl implements InternalTable
{
}
private ReplicaRequest upsertAllInternal(
- TablePartitionId commitPart,
Collection<? extends BinaryRow> keyRows0,
InternalTransaction txo,
ReplicationGroupId groupId,
- Long term) {
+ Long term,
+ boolean full
+ ) {
+ assert txo.commitPartition() != null;
+
return tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
- .commitPartitionId(commitPart)
+ .commitPartitionId(txo.commitPartition())
.binaryRowMessages(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_UPSERT_ALL)
.timestampLong(clock.nowLong())
+ .full(full)
.build();
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 6c08f536cf..64cdcb87ab 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -19,19 +19,23 @@ package org.apache.ignite.internal.table;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
@@ -42,7 +46,6 @@ import
org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.Table;
@@ -68,19 +71,44 @@ public class TxLocalTest extends TxAbstractTest {
lockManager = new HeapLockManager();
- ReplicaService replicaSvc = mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
- PlacementDriver placementDriver = mock(PlacementDriver.class,
RETURNS_DEEP_STUBS);
+ ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
+
+ TestHybridClock localClock = new TestHybridClock(() -> 1);
+ MessagingService msgSvc = mock(MessagingService.class,
RETURNS_DEEP_STUBS);
+ ReplicaService replicaSvc = new ReplicaService(msgSvc, localClock);
Map<ReplicationGroupId, DummyInternalTableImpl> tables = new
HashMap<>();
+ doAnswer(invocationOnMock -> {
+ ReplicaRequest request = invocationOnMock.getArgument(1);
+ ReplicaListener replicaListener =
tables.get(request.groupId()).getReplicaListener();
+
+ if (request instanceof TimestampAware) {
+ TimestampAware aware = (TimestampAware) request;
+ HybridTimestamp updated =
DummyInternalTableImpl.CLOCK.update(aware.timestamp());
+
+ return replicaListener.invoke(request).handle((res, err) ->
err == null ? replicaMessagesFactory
+ .timestampAwareReplicaResponse()
+ .result(res)
+ .timestampLong(updated.longValue())
+ .build() :
+ replicaMessagesFactory
+ .errorTimestampAwareReplicaResponse()
+ .throwable(err)
+ .timestampLong(updated.longValue())
+ .build());
+ } else {
+ return replicaListener.invoke(request).handle((res, err) ->
err == null ? replicaMessagesFactory
+ .replicaResponse()
+ .result(res)
+ .build() : replicaMessagesFactory
+ .errorReplicaResponse()
+ .throwable(err)
+ .build());
+ }
- lenient().doAnswer(
- invocationOnMock -> {
- ReplicaRequest request = invocationOnMock.getArgument(1);
- ReplicaListener replicaListener =
tables.get(request.groupId()).getReplicaListener();
+ }).when(msgSvc).invoke((String) isNull(), any(), anyLong());
- return replicaListener.invoke(request);
- }
- ).when(replicaSvc).invoke(any(ClusterNode.class), any());
+ PlacementDriver placementDriver = mock(PlacementDriver.class,
RETURNS_DEEP_STUBS);
doAnswer(invocationOnMock -> {
TxStateReplicaRequest request = invocationOnMock.getArgument(1);
@@ -89,7 +117,7 @@ public class TxLocalTest extends TxAbstractTest {
tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId()));
}).when(placementDriver).sendMetaRequest(any(), any());
- txManager = new TxManagerImpl(replicaSvc, lockManager, new
HybridClockImpl(), new TransactionIdGenerator(0xdeadbeef));
+ txManager = new TxManagerImpl(replicaSvc, lockManager, localClock, new
TransactionIdGenerator(0xdeadbeef));
igniteTransactions = new IgniteTransactionsImpl(txManager);
@@ -101,8 +129,6 @@ public class TxLocalTest extends TxAbstractTest {
customers = new TableImpl(table2, new
DummySchemaManagerImpl(CUSTOMERS_SCHEMA), lockManager);
-
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class,
RETURNS_DEEP_STUBS));
-
tables.put(table.groupId(), table);
tables.put(table2.groupId(), table2);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 7307f934e4..f51b228ac1 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -163,7 +163,7 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable
BinaryRow row) {
TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
- handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {});
+ handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {},
null);
}
static BinaryRow defaultRow() {
@@ -223,7 +223,7 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
USE_UPDATE {
@Override
void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
- handler.handleUpdate(TX_ID, rowUuid, partitionId, row,
(unused) -> {});
+ handler.handleUpdate(TX_ID, rowUuid, partitionId, row,
(unused) -> {}, null);
}
},
/** Uses updateAll api. */
@@ -241,7 +241,8 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
TX_ID,
singletonMap(rowUuid, rowMessage),
partitionId,
- (unused) -> {}
+ (unused) -> {},
+ null
);
}
};
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index 0a0ce0ee33..76308459c7 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -111,6 +111,7 @@ public class StorageUpdateHandlerTest {
UUID.randomUUID(),
new TablePartitionId(1, PARTITION_ID),
null,
+ null,
null
);
@@ -131,6 +132,7 @@ public class StorageUpdateHandlerTest {
UUID.randomUUID(),
Map.of(),
new TablePartitionId(1, PARTITION_ID),
+ null,
null
);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 4e0b7f6a7f..6a18c727ec 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -98,6 +98,11 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
return partitionStorage.addWrite(rowId, row, txId, commitTableId,
commitPartitionId);
}
+ @Override
+ public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTs) {
+ partitionStorage.addWriteCommitted(rowId, row, commitTs);
+ }
+
@Override
public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException
{
return partitionStorage.abortWrite(rowId);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 5457661d54..c39638ebb0 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -1235,11 +1235,25 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
@Test
public void testScan() throws Exception {
- accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.),
makeValue(2, 200.)));
+ doTestScan(null);
+ }
+
+ @Test
+ public void testScanExplicit() throws Exception {
+ igniteTransactions.runInTransaction(this::doTestScan);
+ }
+
+ /**
+ * Do scan in test.
+ *
+ * @param tx The transaction.
+ */
+ private void doTestScan(@Nullable Transaction tx) {
+ accounts.recordView().upsertAll(tx, List.of(makeValue(1, 100.),
makeValue(2, 200.)));
- CompletableFuture<List<Tuple>> scanFut =
scan(accounts.internalTable(), null);
+ CompletableFuture<List<Tuple>> scanFut =
scan(accounts.internalTable(), tx == null ? null : (InternalTransaction) tx);
- var rows = scanFut.get(10, TimeUnit.SECONDS);
+ var rows = scanFut.join();
Map<Long, Tuple> map = new HashMap<>();
@@ -1249,6 +1263,9 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
assertEquals(100., map.get(1L).doubleValue("balance"));
assertEquals(200., map.get(2L).doubleValue("balance"));
+
+ // Attempt to overwrite.
+ accounts.recordView().upsertAll(tx, List.of(makeValue(1, 300.),
makeValue(2, 400.)));
}
/**
@@ -1258,7 +1275,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
* @param internalTx Internal transaction of {@code null}.
* @return Future to scanning result.
*/
- private CompletableFuture<List<Tuple>> scan(InternalTable internalTable,
InternalTransaction internalTx) {
+ private CompletableFuture<List<Tuple>> scan(InternalTable internalTable,
@Nullable InternalTransaction internalTx) {
Flow.Publisher<BinaryRow> pub = internalTx != null &&
internalTx.isReadOnly()
? internalTable.scan(0, internalTx.readTimestamp(),
internalTable.leaderAssignment(0))
: internalTable.scan(0, internalTx);
@@ -1912,6 +1929,12 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
testTransactionAlreadyFinished(false);
}
+ @Test
+ public void testImplicit() {
+ accounts.recordView().upsert(null, makeValue(1, BALANCE_1));
+ assertEquals(BALANCE_1, accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
+ }
+
/**
* Checks operations that act after a transaction is committed, are
finished with exception.
*
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 99d307c6c0..feb2acbc02 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -31,11 +31,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+import java.util.function.LongSupplier;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -95,10 +96,17 @@ import org.jetbrains.annotations.Nullable;
* Dummy table storage implementation.
*/
public class DummyInternalTableImpl extends InternalTableImpl {
- private static final IgniteLogger LOG =
Loggers.forClass(DummyInternalTableImpl.class);
+ public static final IgniteLogger LOG =
Loggers.forClass(DummyInternalTableImpl.class);
public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1",
2004);
+ public static final HybridClock CLOCK = new TestHybridClock(new
LongSupplier() {
+ @Override
+ public long getAsLong() {
+ return 0;
+ }
+ });
+
private static final int PART_ID = 0;
private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
@@ -107,8 +115,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
new Column[]{new Column("value", NativeTypes.INT64, false)}
);
- private static final HybridClock CLOCK = new HybridClockImpl();
-
private static final ReplicationGroupId crossTableGroupId = new
TablePartitionId(333, 0);
private PartitionListener partitionListener;
@@ -118,7 +124,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
private final ReplicationGroupId groupId;
/** The thread updates safe time on the dummy replica. */
- private Thread safeTimeUpdaterThread;
+ final private PendingComparableValuesTracker<HybridTimestamp, Void>
safeTime;
private static final AtomicInteger nextTableId = new AtomicInteger(10_001);
@@ -131,6 +137,12 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
this(replicaSvc, SCHEMA);
}
+ /**
+ * Creates a new local table.
+ *
+ * @param replicaSvc Replica service.
+ * @param schema Schema.
+ */
public DummyInternalTableImpl(ReplicaService replicaSvc, SchemaDescriptor
schema) {
this(replicaSvc, new TestMvPartitionStorage(0), schema);
}
@@ -273,8 +285,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
IndexLocker pkLocker = new HashIndexLocker(indexId, true,
this.txManager.lockManager(), row2Tuple);
- PendingComparableValuesTracker<HybridTimestamp, Void> safeTime =
- new PendingComparableValuesTracker<>(new HybridTimestamp(1,
0));
+ safeTime = mock(PendingComparableValuesTracker.class);
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartStorage);
TableIndexStoragesSupplier indexes =
createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));
@@ -320,6 +331,9 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
mock(TablesConfiguration.class)
);
+
lenient().when(safeTime.waitFor(any())).thenReturn(completedFuture(null));
+ lenient().when(safeTime.current()).thenReturn(new HybridTimestamp(1,
0));
+
partitionListener = new PartitionListener(
new TestPartitionDataStorage(mvPartStorage),
storageUpdateHandler,
@@ -327,34 +341,22 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
safeTime,
new PendingComparableValuesTracker<>(0L)
);
-
- safeTimeUpdaterThread = new Thread(new SafeTimeUpdater(safeTime),
"safe-time-updater");
-
- safeTimeUpdaterThread.start();
}
/**
- * A process to update safe time periodically.
+ * Set a safe timestamp.
+ *
+ * @param ts Timestamp.
*/
- private static class SafeTimeUpdater implements Runnable {
- PendingComparableValuesTracker<HybridTimestamp, Void> safeTime;
-
- public SafeTimeUpdater(PendingComparableValuesTracker<HybridTimestamp,
Void> safeTime) {
- this.safeTime = safeTime;
- }
+ public void updateSafeTime(HybridTimestamp ts) {
+ safeTime.update(ts, null);
+ }
- @Override
- public void run() {
- while (true) {
- safeTime.update(CLOCK.now(), null);
-
- try {
- Thread.sleep(1_000);
- } catch (InterruptedException e) {
- LOG.warn("The sfe time updater thread is interrupted");
- }
- }
- }
+ /**
+ * @return Current safe time,
+ */
+ public HybridTimestamp getSafeTime() {
+ return safeTime.current();
}
/**
@@ -384,12 +386,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
return txManager;
}
- /** {@inheritDoc} */
- @Override
- public @NotNull String name() {
- return null;
- }
-
/** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow,
InternalTransaction tx) {
@@ -414,17 +410,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
return completedFuture(mock(ClusterNode.class));
}
- @Override
- public void close() {
- super.close();
-
- if (safeTimeUpdaterThread != null) {
- safeTimeUpdaterThread.interrupt();
-
- safeTimeUpdaterThread = null;
- }
- }
-
/**
* Returns dummy table index storages supplier.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index bf1b4c00bc..3f3695cb5a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -84,7 +84,6 @@ public interface InternalTransaction extends Transaction {
*
* @param resultFuture Operation result future.
*/
- @Deprecated
void enlistResultFuture(CompletableFuture<?> resultFuture);
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index fdc4302e15..1c38556e15 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -30,7 +30,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -46,19 +46,29 @@ import org.jetbrains.annotations.NotNull;
* The read-write implementation of an internal transaction.
*/
public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl {
+ /** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(InternalTransaction.class);
+ /** Commit partition updater. */
+ private static final AtomicReferenceFieldUpdater<ReadWriteTransactionImpl,
TablePartitionId> COMMIT_PART_UPDATER =
+
AtomicReferenceFieldUpdater.newUpdater(ReadWriteTransactionImpl.class,
TablePartitionId.class, "commitPart");
+
+ /** Finish future updater. */
+ @SuppressWarnings("rawtypes")
+ private static final AtomicReferenceFieldUpdater<ReadWriteTransactionImpl,
CompletableFuture> FINISH_FUT_UPDATER =
+
AtomicReferenceFieldUpdater.newUpdater(ReadWriteTransactionImpl.class,
CompletableFuture.class, "finishFut");
+
/** Enlisted partitions: partition id -> (primary replica node, raft
term). */
private final Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>>
enlisted = new ConcurrentHashMap<>();
/** Enlisted operation futures in this transaction. */
private final List<CompletableFuture<?>> enlistedResults = new
CopyOnWriteArrayList<>();
- /** Reference to the partition that stores the transaction state. */
- private final AtomicReference<TablePartitionId> commitPartitionRef = new
AtomicReference<>();
+ /** A partition which stores the transaction state. */
+ private volatile TablePartitionId commitPart;
/** The future used on repeated commit/rollback. */
- private final AtomicReference<CompletableFuture<Void>> finishFut = new
AtomicReference<>();
+ private volatile CompletableFuture<Void> finishFut;
/**
* The constructor.
@@ -73,13 +83,13 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/** {@inheritDoc} */
@Override
public boolean assignCommitPartition(TablePartitionId tablePartitionId) {
- return commitPartitionRef.compareAndSet(null, tablePartitionId);
+ return COMMIT_PART_UPDATER.compareAndSet(this, null, tablePartitionId);
}
/** {@inheritDoc} */
@Override
public TablePartitionId commitPartition() {
- return commitPartitionRef.get();
+ return commitPart;
}
/** {@inheritDoc} */
@@ -97,8 +107,8 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/** {@inheritDoc} */
@Override
protected CompletableFuture<Void> finish(boolean commit) {
- if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
- return finishFut.get();
+ if (!FINISH_FUT_UPDATER.compareAndSet(this, null, new
CompletableFuture<>())) {
+ return finishFut;
}
// TODO: https://issues.apache.org/jira/browse/IGNITE-17688 Add proper
exception handling.
@@ -123,7 +133,6 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
}
});
- TablePartitionId commitPart =
commitPartitionRef.get();
ClusterNode recipientNode =
enlisted.get(commitPart).get1();
Long term = enlisted.get(commitPart).get2();
@@ -150,7 +159,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
}
);
- mainFinishFut.handle((res, e) -> finishFut.get().complete(null));
+ mainFinishFut.handle((res, e) -> finishFut.complete(null));
return mainFinishFut;
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index d53bde4ba2..f0165c4d3b 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -237,7 +238,7 @@ public class TxManagerTest extends IgniteAbstractTest {
@Test
public void testObservableTimestamp() {
- int compareThreshold = 50;
+ long compareThreshold = 50;
// Check that idle safe time propagation period is significantly
greater than compareThreshold.
assertTrue(IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW
> compareThreshold * 5);
@@ -260,7 +261,7 @@ public class TxManagerTest extends IgniteAbstractTest {
tx = txManager.begin(true, timestampInPast);
long readTime = now.getPhysical() -
IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS - CLOCK_SKEW;
- assertTrue(abs(readTime - tx.readTimestamp().getPhysical()) <
compareThreshold);
+ assertThat(abs(readTime - tx.readTimestamp().getPhysical()),
Matchers.lessThan(compareThreshold));
tx.commit();
assertThrows(AssertionError.class, () -> txManager.begin(false, now));