ptupitsyn commented on code in PR #7588:
URL: https://github.com/apache/ignite-3/pull/7588#discussion_r2846754139
##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java:
##########
@@ -206,6 +225,79 @@ public void commit() throws TransactionException {
sync(commitAsync());
}
+ /**
+ * Discards the directly mapped transaction fragments in case of
coordinator side transaction invalidation
+ * (either kill or implicit rollback due to mapping failure, see
postEnlist).
+ *
+ * @param killed Killed flag.
+ *
+ * @return The future.
+ */
+ public CompletableFuture<Void> discardDirectMappings(boolean killed) {
+ enlistPartitionLock.writeLock().lock();
+
+ try {
+ if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
+ return finishFut.get();
+ }
+ } finally {
+ enlistPartitionLock.writeLock().unlock();
+ }
+
+ return sendDiscardRequests().handle((r, e) -> {
+ setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
+ ch.inflights().erase(txId());
+ this.finishFut.get().complete(null);
+ return null;
+ });
+ }
+
+ private CompletableFuture<Void> sendDiscardRequests() {
+ assert finishFut != null;
+
+ if
(!ch.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING_SEND_DISCARD)) {
+ return nullCompletedFuture();
+ }
+
+ Map<String, List<TablePartitionId>> enlistments = new HashMap<>();
+
+ for (Entry<TablePartitionId, CompletableFuture<IgniteBiTuple<String,
Long>>> entry : enlisted.entrySet()) {
+ IgniteBiTuple<String, Long> info = entry.getValue().getNow(null);
+
+ if (info == null) {
+ continue; // Ignore incomplete enlistments.
+ }
+
+ enlistments.computeIfAbsent(info.get1(), k -> new
ArrayList<>()).add(entry.getKey());
+ }
+
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(enlistments.size());
+
+ for (Entry<String, List<TablePartitionId>> entry :
enlistments.entrySet()) {
+ CompletableFuture<Void> discardFut =
reliableChannel.getNodeChannelAsync(entry.getKey()).thenCompose(ch -> {
Review Comment:
In other words, we should simply use ReliableChannel.serviceAsync with
preferredNodeName set to the target node, and ensure that the server can
re-route the request, if necessary.
##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java:
##########
@@ -206,6 +225,79 @@ public void commit() throws TransactionException {
sync(commitAsync());
}
+ /**
+ * Discards the directly mapped transaction fragments in case of
coordinator side transaction invalidation
+ * (either kill or implicit rollback due to mapping failure, see
postEnlist).
+ *
+ * @param killed Killed flag.
+ *
+ * @return The future.
+ */
+ public CompletableFuture<Void> discardDirectMappings(boolean killed) {
+ enlistPartitionLock.writeLock().lock();
+
+ try {
+ if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
+ return finishFut.get();
+ }
+ } finally {
+ enlistPartitionLock.writeLock().unlock();
+ }
+
+ return sendDiscardRequests().handle((r, e) -> {
+ setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
+ ch.inflights().erase(txId());
+ this.finishFut.get().complete(null);
+ return null;
+ });
+ }
+
+ private CompletableFuture<Void> sendDiscardRequests() {
+ assert finishFut != null;
+
+ if
(!ch.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING_SEND_DISCARD)) {
+ return nullCompletedFuture();
+ }
+
+ Map<String, List<TablePartitionId>> enlistments = new HashMap<>();
+
+ for (Entry<TablePartitionId, CompletableFuture<IgniteBiTuple<String,
Long>>> entry : enlisted.entrySet()) {
+ IgniteBiTuple<String, Long> info = entry.getValue().getNow(null);
+
+ if (info == null) {
+ continue; // Ignore incomplete enlistments.
+ }
+
+ enlistments.computeIfAbsent(info.get1(), k -> new
ArrayList<>()).add(entry.getKey());
+ }
+
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(enlistments.size());
+
+ for (Entry<String, List<TablePartitionId>> entry :
enlistments.entrySet()) {
+ CompletableFuture<Void> discardFut =
reliableChannel.getNodeChannelAsync(entry.getKey()).thenCompose(ch -> {
Review Comment:
In other words, we should simply use `ReliableChannel.serviceAsync` with
`preferredNodeName` set to the target node, and ensure that the server can
re-route the request, if necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]