This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 5.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push: new 1671f2384c PHOENIX-7369 Avoid redundant recursive getTable() RPC calls (#1944) (#1949) 1671f2384c is described below commit 1671f2384cc88ce8a255f4060304a420770df83d Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Sat Jul 27 07:43:59 2024 -0800 PHOENIX-7369 Avoid redundant recursive getTable() RPC calls (#1944) (#1949) --- .../phoenix/coprocessor/MetaDataEndpointImpl.java | 20 ++- .../apache/phoenix/schema/MetaDataSplitPolicy.java | 4 + .../phoenix/end2end/ConcurrentGetTablesIT.java | 181 +++++++++++++++++++++ 3 files changed, 202 insertions(+), 3 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 6587534254..a2d57789cb 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -586,6 +586,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // before 4.15, so that we can rollback the upgrade to 4.15 if required private boolean allowSplittableSystemCatalogRollback; + private boolean isSystemCatalogSplittable; + protected boolean getMetadataReadLockEnabled; private MetricsMetadataSource metricsSource; @@ -628,6 +630,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); this.getMetadataReadLockEnabled = config.getBoolean(QueryServices.PHOENIX_GET_METADATA_READ_LOCK_ENABLED, QueryServicesOptions.DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED); + this.isSystemCatalogSplittable = MetaDataSplitPolicy.isSystemCatalogSplittable(config); LOGGER.info("Starting Tracing-Metrics Systems"); // Start the phoenix trace collection @@ -1492,11 +1495,22 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } else if (linkType == LinkType.PHYSICAL_TABLE) { // famName contains the logical name of the parent table. We need to get the actual physical name of the table PTable parentTable = null; - if (indexType != IndexType.LOCAL) { + // call getTable() on famName only if it does not start with _IDX_. + // Table name starting with _IDX_ always must refer to HBase table that is + // shared by all view indexes on the given table/view hierarchy. + // _IDX_ is HBase table that does not have corresponding PTable representation + // in Phoenix, hence there is no point of calling getTable(). + if (!famName.getString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) + && indexType != IndexType.LOCAL) { parentTable = getTable(null, SchemaUtil.getSchemaNameFromFullName(famName.getBytes()).getBytes(StandardCharsets.UTF_8), SchemaUtil.getTableNameFromFullName(famName.getBytes()).getBytes(StandardCharsets.UTF_8), clientTimeStamp, clientVersion); - if (parentTable == null) { - // parentTable is not in the cache. Since famName is only logical name, we need to find the physical table. + if (isSystemCatalogSplittable + && (parentTable == null || isTableDeleted(parentTable))) { + // parentTable is neither in the cache nor in the local region. Since + // famName is only logical name, we need to find the physical table. + // Hence, it is recommended to scan SYSTEM.CATALOG table again using + // separate CQSI connection as SYSTEM.CATALOG is splittable so the + // PTable with famName might be available on different region. try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) { parentTable = connection.getTableNoCache(famName.getString()); } catch (TableNotFoundException e) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java index e5a0c33c93..0514597126 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java @@ -25,6 +25,10 @@ public class MetaDataSplitPolicy extends SplitOnLeadingVarCharColumnsPolicy { private boolean allowSystemCatalogToSplit() { Configuration conf = getConf(); + return isSystemCatalogSplittable(conf); + } + + public static boolean isSystemCatalogSplittable(Configuration conf) { boolean allowSplittableSystemCatalogRollback = conf.getBoolean(QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK, QueryServicesOptions.DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentGetTablesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentGetTablesIT.java new file mode 100644 index 0000000000..44b6508604 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentGetTablesIT.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.end2end; + +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Tests to ensure concurrent metadata RPC calls can be served with limited metadata handlers. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class ConcurrentGetTablesIT extends BaseTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentGetTablesIT.class); + + private static void initCluster(int numMetaHandlers) + throws Exception { + Map<String, String> props = Maps.newConcurrentMap(); + props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + Integer.toString(60 * 60 * 1000)); + props.put(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, Integer.toString(numMetaHandlers)); + // Make sure that not only tables are created with UPDATE_CACHE_FREQUENCY=ALWAYS and + // hence queries need to go to regionserver, but also we disable enough of + // metadata caching at server side, invalidation as well as last DDL timestamp + // validation at client side. + // Combine this setup with UPDATE_CACHE_FREQUENCY=ALWAYS and enough RPC calls to + // MetaDataEndpointImpl#clearCache to ensure frequently queries need to execute + // getTable() at server side by scanning SYSTEM.CATALOG. + props.put(QueryServices.DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB, "ALWAYS"); +// props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, Boolean.toString(false)); +// props.put(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, Boolean.toString(false)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @BeforeClass + public static synchronized void doSetup() throws Exception { + initCluster(2); + } + + // This test is important to verify that concurrent getTable() calls as part of executing + // the SELECT queries would be sufficient to be served by limited metadata handlers. + // If this test times out, we have a problem. In order to debug and understand what is + // wrong with the system, thread dump will be required when this test seems stuck. + // 5 min is good enough as timeout value, usually test is expected to be completed within + // 1 or 2 min. + @Test(timeout = 5 * 60 * 1000) + public void testConcurrentGetTablesWithQueries() throws Throwable { + final String tableName = generateUniqueName(); + final String view01 = "v01_" + tableName; + final String view02 = "v02_" + tableName; + final String index_view01 = "idx_v01_" + tableName; + final String index_view02 = "idx_v02_" + tableName; + final String index_view03 = "idx_v03_" + tableName; + final String index_view04 = "idx_v04_" + tableName; + + try (Connection conn = DriverManager.getConnection(getUrl())) { + final Statement stmt = conn.createStatement(); + + stmt.execute("CREATE TABLE " + tableName + + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3 VARCHAR," + + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))" + + " UPDATE_CACHE_FREQUENCY=ALWAYS"); + stmt.execute("CREATE VIEW " + view01 + + " (VCOL1 CHAR(8), COL5 VARCHAR) AS SELECT * FROM " + tableName + + " WHERE COL1 = 'col1'"); + stmt.execute("CREATE VIEW " + view02 + " (VCOL2 CHAR(10), COL6 VARCHAR)" + + " AS SELECT * FROM " + view01 + " WHERE VCOL1 = 'vcol1'"); + stmt.execute("CREATE INDEX " + index_view01 + " ON " + view01 + " (COL5) INCLUDE " + + "(COL1, COL2, COL3)"); + stmt.execute("CREATE INDEX " + index_view02 + " ON " + view02 + " (COL6) INCLUDE " + + "(COL1, COL2, COL3)"); + stmt.execute("CREATE INDEX " + index_view03 + " ON " + view01 + " (COL5) INCLUDE " + + "(COL2, COL1)"); + stmt.execute("CREATE INDEX " + index_view04 + " ON " + view02 + " (COL6) INCLUDE " + + "(COL2, COL1)"); + + stmt.execute("UPSERT INTO " + view02 + + " (col2, vcol2, col5, col6) values ('0001', 'vcol2_01', 'col5_01', " + + "'col6_01')"); + stmt.execute("UPSERT INTO " + view02 + + + " (col2, vcol2, col5, col6) values ('0002', 'vcol2_02', 'col5_02', 'col6_02')"); + stmt.execute("UPSERT INTO " + view02 + + + " (col2, vcol2, col5, col6) values ('0003', 'vcol2_03', 'col5_03', 'col6_03')"); + stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, col5) values " + + "('0004', 'vcol2', 'col3_04', 'col4_04', 'col5_04')"); + stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, col5) values " + + "('0005', 'vcol-2', 'col3_05', 'col4_05', 'col5_05')"); + stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, col5) values " + + "('0006', 'vcol-1', 'col3_06', 'col4_06', 'col5_06')"); + stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, col5) values " + + "('0007', 'vcol1', 'col3_07', 'col4_07', 'col5_07')"); + stmt.execute("UPSERT INTO " + view02 + + + " (col2, vcol2, col5, col6) values ('0008', 'vcol2_08', 'col5_08', 'col6_02')"); + conn.commit(); + + TestUtil.clearMetaDataCache(conn); + + List<Callable<Void>> callableList = + getCallables(conn, view02); + + ExecutorService executorService = Executors.newFixedThreadPool(10); + List<Future<Void>> futures = executorService.invokeAll(callableList); + + for (Future<Void> future : futures) { + future.get(1, TimeUnit.MINUTES); + } + } + } + + private static List<Callable<Void>> getCallables(Connection conn, String view02) { + List<Callable<Void>> callableList = new ArrayList<>(); + for (int k = 0; k < 25; k++) { + callableList.add(() -> { + for (int i = 0; i < 50; i++) { + if (i % 7 == 0) { + try { + TestUtil.clearMetaDataCache(conn); + } catch (Throwable e) { + LOGGER.error("Something went wrong....", e); + throw new RuntimeException(e); + } + } + final Statement statement = conn.createStatement(); + ResultSet rs = + statement.executeQuery( + "SELECT COL2, VCOL1, VCOL2, COL5, COL6 FROM " + view02); + Assert.assertTrue(rs.next()); + Assert.assertTrue(rs.next()); + Assert.assertTrue(rs.next()); + Assert.assertTrue(rs.next()); + Assert.assertTrue(rs.next()); + Assert.assertFalse(rs.next()); + } + return null; + }); + } + return callableList; + } + +}