This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 354716eed9 PHOENIX-7213 Add option for unlimited 
phoenix.query.QueueSize
354716eed9 is described below

commit 354716eed9ef6a172ad98882194db6928385f414
Author: tkhurana <khurana.ta...@gmail.com>
AuthorDate: Wed Feb 14 10:07:05 2024 -0800

    PHOENIX-7213 Add option for unlimited phoenix.query.QueueSize
---
 .../phoenix/job/AbstractRoundRobinQueue.java       | 16 +++++-
 .../apache/phoenix/query/QueryServicesOptions.java | 31 +++++-----
 .../apache/phoenix/end2end/QueryWithLimitIT.java   | 67 ++++++++++++++--------
 3 files changed, 72 insertions(+), 42 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
index fa68852524..001453923e 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
@@ -17,12 +17,22 @@
  */
 package org.apache.phoenix.job;
 
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 
-import java.util.*;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import static 
org.apache.phoenix.query.QueryServicesOptions.UNLIMITED_QUEUE_SIZE;
+
 /**
  *
  * An bounded blocking queue implementation that keeps a virtual queue of 
elements on per-producer
@@ -88,7 +98,7 @@ public abstract class AbstractRoundRobinQueue<E> extends 
AbstractQueue<E>
 
         ProducerList<E> producerList = null;
         synchronized(lock) {
-            if (this.size == this.maxSize) {
+            if (this.maxSize != UNLIMITED_QUEUE_SIZE && this.size == 
this.maxSize) {
                 return false;
             }
             producerList = this.producerMap.get(producerKey);
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index f88ec01a5a..2d8d796a4f 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -141,20 +141,21 @@ import org.apache.phoenix.util.ReadOnlyProps;
  * @since 0.1
  */
 public class QueryServicesOptions {
-       public static final int DEFAULT_KEEP_ALIVE_MS = 60000;
-       public static final int DEFAULT_THREAD_POOL_SIZE = 128;
-       public static final int DEFAULT_QUEUE_SIZE = 5000;
-       public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
-       public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 
20; // 20m
-       public static final int DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES = 1024 * 
1024 * 20; // 20m
-       public static final int DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES = 1024 * 
1024 * 20; // 20m
-       public static final boolean DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED = 
true;
-       public static final boolean DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED = true;
-       public static final boolean DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED = 
true;
+    public static final int DEFAULT_KEEP_ALIVE_MS = 60000;
+    public static final int DEFAULT_THREAD_POOL_SIZE = 128;
+    public static final int DEFAULT_QUEUE_SIZE = 5000;
+    public static final int UNLIMITED_QUEUE_SIZE = -1;
+    public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
+    public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; 
// 20m
+    public static final int DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES = 1024 * 1024 
* 20; // 20m
+    public static final int DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 
* 20; // 20m
+    public static final boolean DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED = true;
+    public static final boolean DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED = true;
+    public static final boolean DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED = true;
     public static final String DEFAULT_SPOOL_DIRECTORY = 
System.getProperty("java.io.tmpdir");
-       public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
-       public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
-       public static final long DEFAULT_MAX_SERVER_CACHE_SIZE = 1024*1024*100; 
 // 100 Mb
+    public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
+    public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
+    public static final long DEFAULT_MAX_SERVER_CACHE_SIZE = 1024 * 1024 * 
100;  // 100 Mb
     public static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 32;
     public static final int DEFAULT_MAX_QUERY_CONCURRENCY = 64;
     public static final String DEFAULT_DATE_FORMAT = 
DateUtil.DEFAULT_DATE_FORMAT;
@@ -178,7 +179,7 @@ public class QueryServicesOptions {
     public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for 
UPSERT SELECT and DELETE
     //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB
     public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152;
-       // The only downside of it being out-of-sync is that the 
parallelization of the scan won't be as balanced as it could be.
+    // The only downside of it being out-of-sync is that the parallelization 
of the scan won't be as balanced as it could be.
     public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; 
// 30 sec (with no activity)
     public static final int 
DEFAULT_MAX_SERVER_CACHE_PERSISTENCE_TIME_TO_LIVE_MS = 30 * 60000; // 30 minutes
     public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
@@ -406,7 +407,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_LOG_SALT_BUCKETS = 32;
     public static final int DEFAULT_SALT_BUCKETS = 0;
 
-       public static final boolean DEFAULT_SYSTEM_CATALOG_SPLITTABLE = true;
+    public static final boolean DEFAULT_SYSTEM_CATALOG_SPLITTABLE = true;
 
     public static final String DEFAULT_GUIDE_POSTS_CACHE_FACTORY_CLASS = 
"org.apache.phoenix.query.DefaultGuidePostsCacheFactory";
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
index 2f778c0fc6..377b6d7cfc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.end2end;
 
+import static 
org.apache.phoenix.query.QueryServicesOptions.UNLIMITED_QUEUE_SIZE;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -36,6 +38,7 @@ import org.apache.phoenix.compile.ExplainPlanAttributes;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
@@ -49,25 +52,25 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 public class QueryWithLimitIT extends BaseTest {
     
     private String tableName;
-    
-    @Before
-    public void generateTableName() {
-        tableName = generateUniqueName();
-    }
-    
+    private static Map<String,String> props = 
Maps.newHashMapWithExpectedSize(5);
 
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
         // Must update config before starting server
         props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(50));
         props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1));
         props.put(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, 
Integer.toString(0)); // Prevents RejectedExecutionException when creatomg 
sequence table
         props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(4));
         props.put(QueryServices.LOG_SALT_BUCKETS_ATTRIB, Integer.toString(0)); 
// Prevents RejectedExecutionException when creating log table
+    }
+
+    @Before
+    public void setupDriver() throws Exception {
+        destroyDriver();
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        tableName = generateUniqueName();
     }
-    
+
     @Test
     public void testQueryWithLimitAndStats() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
@@ -106,26 +109,42 @@ public class QueryWithLimitIT extends BaseTest {
     
     @Test
     public void testQueryWithoutLimitFails() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-
-        conn.createStatement().execute("create table " + tableName + "\n" +
-                "   (i1 integer not null, i2 integer not null\n" +
-                "    CONSTRAINT pk PRIMARY KEY (i1,i2))");
-        initTableValues(conn, 100);
-        conn.createStatement().execute("UPDATE STATISTICS " + tableName);
-        
+        Properties connProps = 
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
         String query = "SELECT i1 FROM " + tableName;
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
connProps)) {
+
+            conn.createStatement().execute("create table " + tableName + "\n" +
+                    "   (i1 integer not null, i2 integer not null\n" +
+                    "    CONSTRAINT pk PRIMARY KEY (i1,i2))");
+            initTableValues(conn, 100);
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            try {
+                ResultSet rs = conn.createStatement().executeQuery(query);
+                rs.next();
+                fail();
+            } catch (SQLException e) {
+                assertTrue(e.getCause() instanceof RejectedExecutionException);
+            }
+        }
+
+        // now run the same test with queue size set to unlimited
         try {
-            ResultSet rs = conn.createStatement().executeQuery(query);
-            rs.next();
-            fail();
-        } catch (SQLException e) {
-            assertTrue(e.getCause() instanceof RejectedExecutionException);
+            destroyDriver();
+            // copy the existing properties
+            Map<String, String> newProps = Maps.newHashMap(props);
+            newProps.put(QueryServices.QUEUE_SIZE_ATTRIB, 
Integer.toString(UNLIMITED_QUEUE_SIZE));
+            setUpTestDriver(new ReadOnlyProps(newProps.entrySet().iterator()));
+            try (Connection conn = DriverManager.getConnection(getUrl(), 
connProps)) {
+                // now the query should succeed
+                ResultSet rs = conn.createStatement().executeQuery(query);
+                assertTrue(rs.next());
+            }
+        } finally {
+            destroyDriver();
         }
-        conn.close();
     }
-    
+
     protected void initTableValues(Connection conn, int nRows) throws 
Exception {
         PreparedStatement stmt = conn.prepareStatement(
             "upsert into " + tableName + 

Reply via email to