Vladsz83 commented on code in PR #12589:
URL: https://github.com/apache/ignite/pull/12589#discussion_r2630878566


##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java:
##########
@@ -121,6 +139,11 @@ public QueryAwareTask pollTaskAndBlockQuery(long timeout, 
TimeUnit unit) throws
 
             return res;
         }
+        catch (Exception e) {

Review Comment:
   Minor. Might be `Throwable`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java:
##########
@@ -69,11 +69,18 @@ private static class Node {
     /** Set of blocked (currently running) queries. */
     private final Set<QueryKey> blockedQrys = new HashSet<>();
 
+    /** All tasks blocked by currently running queries. Can be false-negative. 
*/
+    private boolean allTasksBlocked;
+
+    /** Count of not parked task processing threads. */

Review Comment:
   ` task processing threads` is a bit confusing. Sounds like `active, working, 
busy threads` meaning not `free`. If it means not parked, let's use the same in 
the comment and the fieldName, 'not parked'



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java:
##########
@@ -160,8 +182,9 @@ public void unblockQuery(QueryKey qryKey) {
 
             assert removed;
 
-            if (cnt.get() > 0)
-                notEmpty.signal();

Review Comment:
   We removed almost all signals. With the condition `if (tasksCnt > 
freeThreadsCnt` in the task adding, it seems we might keep rest of the tasks 
unprocessed until a new one comes. If it won't, some tasks wil lstay in the 
queue unprocessed. WDYT?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java:
##########
@@ -101,16 +114,21 @@ public void addTask(QueryAwareTask task) {
 
     /** Poll task and block query. */
     public QueryAwareTask pollTaskAndBlockQuery(long timeout, TimeUnit unit) 
throws InterruptedException {
+        long nanos = unit.toNanos(timeout);
+
         lock.lockInterruptibly();
 
         try {
+            freeThreadsCnt--;
+
             QueryAwareTask res;
 
-            long nanos = unit.toNanos(timeout);
+            while (cnt.get() == 0 || allTasksBlocked || (allTasksBlocked = 
(res = dequeue()) == null)) {
+                if (nanos <= 0L) {
+                    freeThreadsCnt++;
 
-            while (cnt.get() == 0 || (res = dequeue()) == null) {
-                if (nanos <= 0L)
                     return null;
+                }

Review Comment:
   Minor. Once we can return `null`, the method type might be marked as 
`@Nullable`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java:
##########
@@ -101,16 +114,21 @@ public void addTask(QueryAwareTask task) {
 
     /** Poll task and block query. */
     public QueryAwareTask pollTaskAndBlockQuery(long timeout, TimeUnit unit) 
throws InterruptedException {
+        long nanos = unit.toNanos(timeout);

Review Comment:
   Suggestion We might optimize here a bit. In the actual code, outside tests, 
it is always nanoseconds. Let's remove `TimeUnit` and rename `long timeout` -> 
`long timeoutNanos`



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueueTest.java:
##########
@@ -67,15 +67,13 @@ public void testQueryBlockingUnblocking() throws Exception {
             }
         };
 
-        // Unparking on unblock query.
+        // Unblock query.

Review Comment:
   I think this comment is not required or smth. like 'Query unblock check'. 
The same below.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryTasksQueue.java:
##########
@@ -83,16 +90,22 @@ public int size() {
 
     /** Add a task to the queue. */
     public void addTask(QueryAwareTask task) {
+        Node node = new Node(task);
+
         lock.lock();
 
         try {
             assert last.next == null : "Unexpected last.next: " + last.next;
 
-            last = last.next = new Node(task);
+            last = last.next = node;
 
-            cnt.getAndIncrement();
+            int tasksCnt = cnt.incrementAndGet();
 
-            notEmpty.signal();
+            // Do not wake up new threads if it's enough free treads to 
process the new task.

Review Comment:
   What if `tasksCnt > freeThreadsCnt` but a couple of threads can porocess 
tasks? For instance, 10 tasks and 1-2 free threads. Why not allow them to pick 
up a task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to