This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new 1671f2384c PHOENIX-7369 Avoid redundant recursive getTable() RPC calls 
(#1944) (#1949)
1671f2384c is described below

commit 1671f2384cc88ce8a255f4060304a420770df83d
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Sat Jul 27 07:43:59 2024 -0800

    PHOENIX-7369 Avoid redundant recursive getTable() RPC calls (#1944) (#1949)
---
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  20 ++-
 .../apache/phoenix/schema/MetaDataSplitPolicy.java |   4 +
 .../phoenix/end2end/ConcurrentGetTablesIT.java     | 181 +++++++++++++++++++++
 3 files changed, 202 insertions(+), 3 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 6587534254..a2d57789cb 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -586,6 +586,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     // before 4.15, so that we can rollback the upgrade to 4.15 if required
     private boolean allowSplittableSystemCatalogRollback;
 
+    private boolean isSystemCatalogSplittable;
+
     protected boolean getMetadataReadLockEnabled;
 
     private MetricsMetadataSource metricsSource;
@@ -628,6 +630,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         this.getMetadataReadLockEnabled
                 = 
config.getBoolean(QueryServices.PHOENIX_GET_METADATA_READ_LOCK_ENABLED,
                             
QueryServicesOptions.DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED);
+        this.isSystemCatalogSplittable = 
MetaDataSplitPolicy.isSystemCatalogSplittable(config);
 
         LOGGER.info("Starting Tracing-Metrics Systems");
         // Start the phoenix trace collection
@@ -1492,11 +1495,22 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 } else if (linkType == LinkType.PHYSICAL_TABLE) {
                     // famName contains the logical name of the parent table. 
We need to get the actual physical name of the table
                     PTable parentTable = null;
-                    if (indexType != IndexType.LOCAL) {
+                    // call getTable() on famName only if it does not start 
with _IDX_.
+                    // Table name starting with _IDX_ always must refer to 
HBase table that is
+                    // shared by all view indexes on the given table/view 
hierarchy.
+                    // _IDX_ is HBase table that does not have corresponding 
PTable representation
+                    // in Phoenix, hence there is no point of calling 
getTable().
+                    if 
(!famName.getString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)
+                        && indexType != IndexType.LOCAL) {
                         parentTable = getTable(null, 
SchemaUtil.getSchemaNameFromFullName(famName.getBytes()).getBytes(StandardCharsets.UTF_8),
                                 
SchemaUtil.getTableNameFromFullName(famName.getBytes()).getBytes(StandardCharsets.UTF_8),
 clientTimeStamp, clientVersion);
-                        if (parentTable == null) {
-                            // parentTable is not in the cache. Since famName 
is only logical name, we need to find the physical table.
+                        if (isSystemCatalogSplittable
+                            && (parentTable == null || 
isTableDeleted(parentTable))) {
+                            // parentTable is neither in the cache nor in the 
local region. Since
+                            // famName is only logical name, we need to find 
the physical table.
+                            // Hence, it is recommended to scan SYSTEM.CATALOG 
table again using
+                            // separate CQSI connection as SYSTEM.CATALOG is 
splittable so the
+                            // PTable with famName might be available on 
different region.
                             try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
 {
                                 parentTable = 
connection.getTableNoCache(famName.getString());
                             } catch (TableNotFoundException e) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java
index e5a0c33c93..0514597126 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java
@@ -25,6 +25,10 @@ public class MetaDataSplitPolicy extends 
SplitOnLeadingVarCharColumnsPolicy {
 
     private boolean allowSystemCatalogToSplit() {
         Configuration conf = getConf();
+        return isSystemCatalogSplittable(conf);
+    }
+
+    public static boolean isSystemCatalogSplittable(Configuration conf) {
         boolean allowSplittableSystemCatalogRollback =
                 
conf.getBoolean(QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK,
                     
QueryServicesOptions.DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentGetTablesIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentGetTablesIT.java
new file mode 100644
index 0000000000..44b6508604
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentGetTablesIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests to ensure concurrent metadata RPC calls can be served with limited 
metadata handlers.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentGetTablesIT extends BaseTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConcurrentGetTablesIT.class);
+
+    private static void initCluster(int numMetaHandlers)
+            throws Exception {
+        Map<String, String> props = Maps.newConcurrentMap();
+        
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+                Integer.toString(60 * 60 * 1000));
+        props.put(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, 
Integer.toString(numMetaHandlers));
+        // Make sure that not only tables are created with 
UPDATE_CACHE_FREQUENCY=ALWAYS and
+        // hence queries need to go to regionserver, but also we disable 
enough of
+        // metadata caching at server side, invalidation as well as last DDL 
timestamp
+        // validation at client side.
+        // Combine this setup with UPDATE_CACHE_FREQUENCY=ALWAYS and enough 
RPC calls to
+        // MetaDataEndpointImpl#clearCache to ensure frequently queries need 
to execute
+        // getTable() at server side by scanning SYSTEM.CATALOG.
+        props.put(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB, 
"ALWAYS");
+//        props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, 
Boolean.toString(false));
+//        props.put(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, 
Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        initCluster(2);
+    }
+
+    // This test is important to verify that concurrent getTable() calls as 
part of executing
+    // the SELECT queries would be sufficient to be served by limited metadata 
handlers.
+    // If this test times out, we have a problem. In order to debug and 
understand what is
+    // wrong with the system, thread dump will be required when this test 
seems stuck.
+    // 5 min is good enough as timeout value, usually test is expected to be 
completed within
+    // 1 or 2 min.
+    @Test(timeout = 5 * 60 * 1000)
+    public void testConcurrentGetTablesWithQueries() throws Throwable {
+        final String tableName = generateUniqueName();
+        final String view01 = "v01_" + tableName;
+        final String view02 = "v02_" + tableName;
+        final String index_view01 = "idx_v01_" + tableName;
+        final String index_view02 = "idx_v02_" + tableName;
+        final String index_view03 = "idx_v03_" + tableName;
+        final String index_view04 = "idx_v04_" + tableName;
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            final Statement stmt = conn.createStatement();
+
+            stmt.execute("CREATE TABLE " + tableName
+                    + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3 
VARCHAR,"
+                    + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))"
+                    + " UPDATE_CACHE_FREQUENCY=ALWAYS");
+            stmt.execute("CREATE VIEW " + view01
+                    + " (VCOL1 CHAR(8), COL5 VARCHAR) AS SELECT * FROM " + 
tableName
+                    + " WHERE COL1 = 'col1'");
+            stmt.execute("CREATE VIEW " + view02 + " (VCOL2 CHAR(10), COL6 
VARCHAR)"
+                    + " AS SELECT * FROM " + view01 + " WHERE VCOL1 = 
'vcol1'");
+            stmt.execute("CREATE INDEX " + index_view01 + " ON " + view01 + " 
(COL5) INCLUDE "
+                    + "(COL1, COL2, COL3)");
+            stmt.execute("CREATE INDEX " + index_view02 + " ON " + view02 + " 
(COL6) INCLUDE "
+                    + "(COL1, COL2, COL3)");
+            stmt.execute("CREATE INDEX " + index_view03 + " ON " + view01 + " 
(COL5) INCLUDE "
+                    + "(COL2, COL1)");
+            stmt.execute("CREATE INDEX " + index_view04 + " ON " + view02 + " 
(COL6) INCLUDE "
+                    + "(COL2, COL1)");
+
+            stmt.execute("UPSERT INTO " + view02
+                    + " (col2, vcol2, col5, col6) values ('0001', 'vcol2_01', 
'col5_01', " +
+                    "'col6_01')");
+            stmt.execute("UPSERT INTO " + view02
+                    +
+                    " (col2, vcol2, col5, col6) values ('0002', 'vcol2_02', 
'col5_02', 'col6_02')");
+            stmt.execute("UPSERT INTO " + view02
+                    +
+                    " (col2, vcol2, col5, col6) values ('0003', 'vcol2_03', 
'col5_03', 'col6_03')");
+            stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, 
col5) values "
+                    + "('0004', 'vcol2', 'col3_04', 'col4_04', 'col5_04')");
+            stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, 
col5) values "
+                    + "('0005', 'vcol-2', 'col3_05', 'col4_05', 'col5_05')");
+            stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, 
col5) values "
+                    + "('0006', 'vcol-1', 'col3_06', 'col4_06', 'col5_06')");
+            stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, 
col5) values "
+                    + "('0007', 'vcol1', 'col3_07', 'col4_07', 'col5_07')");
+            stmt.execute("UPSERT INTO " + view02
+                    +
+                    " (col2, vcol2, col5, col6) values ('0008', 'vcol2_08', 
'col5_08', 'col6_02')");
+            conn.commit();
+
+            TestUtil.clearMetaDataCache(conn);
+
+            List<Callable<Void>> callableList =
+                    getCallables(conn, view02);
+
+            ExecutorService executorService = Executors.newFixedThreadPool(10);
+            List<Future<Void>> futures = 
executorService.invokeAll(callableList);
+
+            for (Future<Void> future : futures) {
+                future.get(1, TimeUnit.MINUTES);
+            }
+        }
+    }
+
+    private static List<Callable<Void>> getCallables(Connection conn, String 
view02) {
+        List<Callable<Void>> callableList = new ArrayList<>();
+        for (int k = 0; k < 25; k++) {
+            callableList.add(() -> {
+                for (int i = 0; i < 50; i++) {
+                    if (i % 7 == 0) {
+                        try {
+                            TestUtil.clearMetaDataCache(conn);
+                        } catch (Throwable e) {
+                            LOGGER.error("Something went wrong....", e);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    final Statement statement = conn.createStatement();
+                    ResultSet rs =
+                            statement.executeQuery(
+                                    "SELECT COL2, VCOL1, VCOL2, COL5, COL6 
FROM " + view02);
+                    Assert.assertTrue(rs.next());
+                    Assert.assertTrue(rs.next());
+                    Assert.assertTrue(rs.next());
+                    Assert.assertTrue(rs.next());
+                    Assert.assertTrue(rs.next());
+                    Assert.assertFalse(rs.next());
+                }
+                return null;
+            });
+        }
+        return callableList;
+    }
+
+}

Reply via email to