This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/master by this push:
     new 427ff45  Tweak parallel querying of GINQ
427ff45 is described below

commit 427ff45f4d9eff13af3369add5b6b3d6b2eb598b
Author: Daniel Sun <[email protected]>
AuthorDate: Sun Jul 18 17:03:02 2021 +0800

    Tweak parallel querying of GINQ
---
 .../groovy/ginq/provider/collection/GinqAstWalker.groovy     |  9 ++++++++-
 .../ginq/provider/collection/runtime/QueryableHelper.groovy  | 12 ++++++++++--
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
 
b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
index 53ad277..bb0739f 100644
--- 
a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
+++ 
b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/GinqAstWalker.groovy
@@ -98,6 +98,7 @@ import static org.codehaus.groovy.ast.tools.GeneralUtils.propX
 import static org.codehaus.groovy.ast.tools.GeneralUtils.returnS
 import static org.codehaus.groovy.ast.tools.GeneralUtils.stmt
 import static org.codehaus.groovy.ast.tools.GeneralUtils.varX
+
 /**
  * Visit AST of GINQ to generate target method calls for GINQ
  *
@@ -211,7 +212,13 @@ class GinqAstWalker implements GinqAstVisitor<Expression>, 
SyntaxErrorReportable
         }
         statementList << returnS(varX(resultName))
 
-        def result = callX(lambdaX(block(statementList as Statement[])), 
"call")
+        def resultLambda = lambdaX(block(statementList as Statement[]))
+        def result = parallelEnabled
+                        ? callX(
+                                callX(QUERYABLE_HELPER_TYPE, 'submit', 
args(resultLambda)),
+                    "get"
+                            )
+                        : callX(resultLambda, "call")
 
         ginqExpressionStack.pop()
         return result
diff --git 
a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
 
b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
index 16940c6..dcebcc3 100644
--- 
a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
+++ 
b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableHelper.groovy
@@ -20,9 +20,12 @@ package org.apache.groovy.ginq.provider.collection.runtime
 
 import groovy.transform.CompileStatic
 
+import java.util.concurrent.Callable
 import java.util.concurrent.CompletableFuture
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
+import java.util.concurrent.ForkJoinPool
+import java.util.concurrent.ForkJoinTask
 import java.util.concurrent.TimeUnit
 import java.util.function.Function
 import java.util.function.Supplier
@@ -83,6 +86,10 @@ class QueryableHelper {
         return CompletableFuture.supplyAsync(() -> { function.apply(param) }, 
ThreadPoolHolder.THREAD_POOL)
     }
 
+    static <T> ForkJoinTask<T> submit(Callable<T> callable) {
+        return ThreadPoolHolder.FORKJOIN_POOL.submit(callable)
+    }
+
     static boolean isParallel() {
         return TRUE_STR == getVar(PARALLEL)
     }
@@ -95,7 +102,7 @@ class QueryableHelper {
         (T) VAR_HOLDER.get().get(name)
     }
 
-    static <T> T  removeVar(String name) {
+    static <T> T removeVar(String name) {
         (T) VAR_HOLDER.get().remove(name)
     }
 
@@ -127,12 +134,13 @@ class QueryableHelper {
 
     private static class ThreadPoolHolder {
         static int seq
-        static final ExecutorService THREAD_POOL = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), 
(Runnable r) -> {
+        static final ExecutorService THREAD_POOL = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1, 
(Runnable r) -> {
             Thread t = new Thread(r)
             t.setName("ginq-thread-" + seq++)
             t.setDaemon(true)
             return t
         })
+        static final ForkJoinPool FORKJOIN_POOL = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors() + 1)
         private ThreadPoolHolder() {}
     }
 }

Reply via email to