Cyrill commented on code in PR #3371:
URL: https://github.com/apache/ignite-3/pull/3371#discussion_r1524647450


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -560,7 +569,27 @@ private CompletableFuture<AsyncSqlCursor<InternalSqlRow>> 
querySingle0(
 
             QueryTransactionWrapper txWrapper = 
txCtx.getOrStartImplicit(result.queryType());
 
-            return executeParsedStatement(schemaName, result, txWrapper, 
queryCancel, timeZoneId, params, null);
+            InternalTransaction tx = txWrapper.unwrap();
+
+            // Adding inflights only for read-only transactions.
+            if (tx.isReadOnly() && !transactionInflights.addInflight(tx.id(), 
tx.isReadOnly())) {

Review Comment:
   Here you can reuse the code from InternalTable in the following way:
   ```
      InflightBatchRequestTracker transactionTracker = tx.isReadOnly() ?
                       new 
ReadOnlyInflightBatchRequestTracker(transactionInflights, tx.id()) : new 
ReadWriteInflightBatchRequestTracker();
               transactionTracker.onRequestBegin();
   
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java:
##########
@@ -41,54 +38,56 @@ public class FinishedReadOnlyTransactionTracker {
     /** Tx messages factory. */
     private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
 
-    /** A collection of finished read only transactions ordered by the time 
when when they were finished. */
-    private final Collection<UUID> finishedTransactions = new 
LinkedBlockingQueue<>();
-
     /** Topology service. */
     private final TopologyService topologyService;
 
     /** Messaging service. */
     private final MessagingService messagingService;
 
+    /** Transaction inflights. */
+    private final TransactionInflights transactionInflights;
+
     /**
      * Constructor.
      *
      * @param topologyService Topology service.
      * @param messagingService Messaging service.
+     * @param transactionInflights Transaction inflights.
      */
-    public FinishedReadOnlyTransactionTracker(TopologyService topologyService, 
MessagingService messagingService) {
+    public FinishedReadOnlyTransactionTracker(
+            TopologyService topologyService,
+            MessagingService messagingService,
+            TransactionInflights transactionInflights
+    ) {
         this.topologyService = topologyService;
         this.messagingService = messagingService;
+        this.transactionInflights = transactionInflights;
     }
 
     /**
      * Send close cursors batch message to all cluster nodes.
      */
     public void broadcastClosedTransactions() {
-        if (finishedTransactions.isEmpty()) {
-            return;
+        Collection<UUID> txToSend = 
transactionInflights.finishedReadOnlyTransactions(MAX_FINISHED_TRANSACTIONS_IN_BATCH);

Review Comment:
   If I remember correctly, we were going to make this unlimited (or limit to 
Int.Max)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to