This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/master by this push:
     new 53751b3  Tweak parallel querying
53751b3 is described below

commit 53751b337c91d5d9b1b47a4b4cdd8190686b6b00
Author: Daniel Sun <[email protected]>
AuthorDate: Tue Dec 15 08:05:15 2020 +0800

    Tweak parallel querying
---
 .../collection/runtime/ConcurrentObjectHolder.java | 77 ++++++++++++++++++++++
 .../collection/runtime/QueryableCollection.java    | 21 ++----
 .../test/org/apache/groovy/ginq/GinqTest.groovy    | 15 +++++
 3 files changed, 99 insertions(+), 14 deletions(-)

diff --git 
a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
 
b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
new file mode 100644
index 0000000..ad6b32d
--- /dev/null
+++ 
b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/ConcurrentObjectHolder.java
@@ -0,0 +1,77 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.groovy.ginq.provider.collection.runtime;
+
+import org.apache.groovy.internal.util.Supplier;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Hold an object thread-safely
+ *
+ * @param <T> the type of object
+ * @since 4.0.0
+ */
+class ConcurrentObjectHolder<T> {
+    private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+    private final Lock readLock = rwl.readLock();
+    private final Lock writeLock = rwl.writeLock();
+
+    private volatile T object;
+
+    public ConcurrentObjectHolder() {}
+
+    public ConcurrentObjectHolder(T object) {
+        this.object = object;
+    }
+
+    public T getObject() {
+        readLock.lock();
+        try {
+            return object;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public T getObject(Supplier<? extends T> def) {
+        if (null != object) return object;
+
+        writeLock.lock();
+        try {
+            if (null == object) {
+                object = def.get();
+            }
+            return object;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public void setObject(T object) {
+        writeLock.lock();
+        try {
+            this.object = object;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+}
diff --git 
a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
 
b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
index e00377f..3243187 100644
--- 
a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
+++ 
b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
@@ -21,7 +21,6 @@ package org.apache.groovy.ginq.provider.collection.runtime;
 import groovy.lang.Tuple2;
 import groovy.transform.Internal;
 import org.apache.groovy.internal.util.Supplier;
-import org.apache.groovy.util.ObjectHolder;
 import org.apache.groovy.util.SystemUtil;
 import org.codehaus.groovy.runtime.DefaultGroovyMethods;
 import org.codehaus.groovy.runtime.typehandling.NumberMath;
@@ -66,7 +65,7 @@ class QueryableCollection<T> implements Queryable<T>, 
Serializable {
     private final ReadWriteLock rwl = new ReentrantReadWriteLock();
     private final Lock readLock = rwl.readLock();
     private final Lock writeLock = rwl.writeLock();
-    private Iterable<T> sourceIterable;
+    private volatile Iterable<T> sourceIterable;
     private Stream<T> sourceStream;
 
     QueryableCollection(Iterable<T> sourceIterable) {
@@ -109,7 +108,7 @@ class QueryableCollection<T> implements Queryable<T>, 
Serializable {
 
     @Override
     public <U> Queryable<Tuple2<T, U>> innerHashJoin(Queryable<? extends U> 
queryable, Function<? super T, ?> fieldsExtractor1, Function<? super U, ?> 
fieldsExtractor2) {
-        final ObjectHolder<Map<Integer, List<U>>> hashTableHolder = new 
ObjectHolder<>();
+        final ConcurrentObjectHolder<Map<Integer, List<U>>> hashTableHolder = 
new ConcurrentObjectHolder<>();
         final Supplier<Map<Integer, List<U>>> hashTableSupplier = 
createHashTableSupplier(queryable, fieldsExtractor2);
         Stream<Tuple2<T, U>> stream = this.stream().flatMap(p -> {
             // build hash table
@@ -410,7 +409,7 @@ class QueryableCollection<T> implements Queryable<T>, 
Serializable {
     }
 
     private static <T, U> Queryable<Tuple2<T, U>> outerHashJoin(Queryable<? 
extends T> queryable1, Queryable<? extends U> queryable2, Function<? super T, 
?> fieldsExtractor1, Function<? super U, ?> fieldsExtractor2) {
-        final ObjectHolder<Map<Integer, List<U>>> hashTableHolder = new 
ObjectHolder<>();
+        final ConcurrentObjectHolder<Map<Integer, List<U>>> hashTableHolder = 
new ConcurrentObjectHolder<>();
         final Supplier<Map<Integer, List<U>>> hashTableSupplier = 
createHashTableSupplier(queryable2, fieldsExtractor2);
         Stream<Tuple2<T, U>> stream = queryable1.stream().flatMap(p -> {
             // build hash table
@@ -426,16 +425,8 @@ class QueryableCollection<T> implements Queryable<T>, 
Serializable {
         return from(stream);
     }
 
-    private static <U> Map<Integer, List<U>> buildHashTable(final 
ObjectHolder<Map<Integer, List<U>>> hashTableHolder, final 
Supplier<Map<Integer, List<U>>> hashTableSupplier) {
-        Map<Integer, List<U>> hashTable = hashTableHolder.getObject();
-        if (null == hashTable) {
-            synchronized (hashTableHolder) {
-                if (null == hashTable) {
-                    hashTable = hashTableHolder.getObject(hashTableSupplier);
-                }
-            }
-        }
-        return hashTable;
+    private static <U> Map<Integer, List<U>> buildHashTable(final 
ConcurrentObjectHolder<Map<Integer, List<U>>> hashTableHolder, final 
Supplier<Map<Integer, List<U>>> hashTableSupplier) {
+        return hashTableHolder.getObject(hashTableSupplier);
     }
 
     private static <T, U> Stream<Tuple2<T, U>> probeHashTable(Map<Integer, 
List<U>> hashTable, T p, Function<? super T, ?> fieldsExtractor1, Function<? 
super U, ?> fieldsExtractor2) {
@@ -504,6 +495,8 @@ class QueryableCollection<T> implements Queryable<T>, 
Serializable {
     }
 
     private void makeReusable() {
+        if (null != this.sourceIterable) return;
+
         writeLock.lock();
         try {
             if (null != this.sourceIterable) return;
diff --git 
a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy 
b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
index 5834ce9..5e2ef05 100644
--- 
a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
+++ 
b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
@@ -4445,6 +4445,21 @@ class GinqTest {
         '''
     }
 
+    @Test
+    void "testGinq - parallel - 3"() {
+        assertGinqScript '''
+            assert [[6, 9]] == GQ(optimize: false, parallel: true) {
+                from n in [1, 1, 3, 3, 6, 6, 6]
+                innerjoin m in [1, 1, 3, 3, 6, 6, 6] on n == m
+                where n != 3
+                groupby n
+                having count() > 4
+                orderby count(n) in asc
+                select n, count(n)
+            }.toList()
+        '''
+    }
+
     private static void assertGinqScript(String script) {
         String deoptimizedScript = script.replaceAll(/\bGQ\s*[{]/, 
'GQ(optimize:false) {')
         List<String> scriptList = [deoptimizedScript, script]

Reply via email to