This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new a96449825e PHOENIX-7363 Protect server side metadata cache updates for 
the given PTable (#1943)
a96449825e is described below

commit a96449825ec4d685de11f410933afaad35db7fe3
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Sun Jul 28 09:51:26 2024 -0800

    PHOENIX-7363 Protect server side metadata cache updates for the given 
PTable (#1943)
---
 .../org/apache/phoenix/query/QueryServices.java    |  9 ++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  | 52 +++++++++++++++++-----
 2 files changed, 51 insertions(+), 10 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index f4f0574a1d..1a42cf5d3b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -510,6 +510,15 @@ public interface QueryServices extends SQLCloseable {
 
     String PHOENIX_GET_METADATA_READ_LOCK_ENABLED = 
"phoenix.get.metadata.read.lock.enabled";
 
+    /**
+     * If server side metadata cache is empty, take Phoenix writeLock for the 
given row
+     * and make sure we can acquire the writeLock within the configurable 
duration.
+     */
+    String PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT =
+        "phoenix.metadata.update.rowlock.timeout";
+
+    long DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT = 60000;
+
     /**
      * Get executor service used for parallel scans
      */
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 4048086022..b54fd8c1e9 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -195,6 +195,7 @@ import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import 
org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
+import org.apache.phoenix.hbase.index.LockManager;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -326,6 +327,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements RegionCopr
     private static final byte[] PHYSICAL_TABLE_BYTES =
             new byte[]{PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
 
+    private LockManager lockManager;
+    private long metadataCacheRowLockTimeout;
+
     // KeyValues for Table
     private static final Cell TABLE_TYPE_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
         TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
@@ -642,8 +646,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             throw new CoprocessorException("Must be loaded on a table 
region!");
         }
 
+        this.lockManager = new LockManager();
         phoenixAccessCoprocessorHost = new 
PhoenixMetaDataCoprocessorHost(this.env);
         Configuration config = env.getConfiguration();
+        this.metadataCacheRowLockTimeout =
+            
config.getLong(QueryServices.PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT,
+                
QueryServices.DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT);
         this.accessCheckEnabled = 
config.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
                 QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
         this.blockWriteRebuildIndex = 
config.getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
@@ -765,6 +773,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, 
clientTimeStamp);
         Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
         PTable newTable;
+        region.startRegionOperation();
         try (RegionScanner scanner = region.getScanner(scan)) {
             PTable oldTable = (PTable) metaDataCache.getIfPresent(cacheKey);
             long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP - 1 : 
oldTable.getTimeStamp();
@@ -782,6 +791,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 }
                 metaDataCache.put(cacheKey, newTable);
             }
+        } finally {
+            region.closeRegionOperation();
         }
         return newTable;
     }
@@ -3920,8 +3931,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 rowLock = acquireLock(region, key, null, true);
             }
             PTable table =
-                    getTableFromCache(cacheKey, clientTimeStamp, 
clientVersion);
-            table = modifyIndexStateForOldClient(clientVersion, table);
+                getTableFromCacheWithModifiedIndexState(clientTimeStamp, 
clientVersion, cacheKey);
             // We only cache the latest, so we'll end up building the table 
with every call if the
             // client connection has specified an SCN.
             // TODO: If we indicate to the client that we're returning an 
older version, but there's a
@@ -3934,17 +3944,32 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 }
                 return table;
             }
-            // Query for the latest table first, since it's not cached
-            table =
+            // take Phoenix row level write-lock as we need to protect 
metadata cache update
+            // after scanning SYSTEM.CATALOG to retrieve the PTable object
+            LockManager.RowLock phoenixRowLock =
+                lockManager.lockRow(key, this.metadataCacheRowLockTimeout);
+            try {
+                table = 
getTableFromCacheWithModifiedIndexState(clientTimeStamp, clientVersion,
+                    cacheKey);
+                if (table != null && table.getTimeStamp() < clientTimeStamp) {
+                    if (isTableDeleted(table)) {
+                        return null;
+                    }
+                    return table;
+                }
+                // Query for the latest table first, since it's not cached
+                table =
                     buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP, clientVersion);
-            if ((table != null && table.getTimeStamp() <= clientTimeStamp) ||
-                    (blockWriteRebuildIndex && 
table.getIndexDisableTimestamp() > 0)) {
+                if ((table != null && table.getTimeStamp() <= clientTimeStamp) 
|| (
+                    blockWriteRebuildIndex && table.getIndexDisableTimestamp() 
> 0)) {
+                    return table;
+                }
+                // Otherwise, query for an older version of the table - it 
won't be cached
+                table = buildTable(key, cacheKey, region, clientTimeStamp, 
clientVersion);
                 return table;
+            } finally {
+                phoenixRowLock.release();
             }
-            // Otherwise, query for an older version of the table - it won't 
be cached
-            table =
-                    buildTable(key, cacheKey, region, clientTimeStamp, 
clientVersion);
-            return table;
         } finally {
             if (!wasLocked && rowLock != null) {
                 rowLock.release();
@@ -3952,6 +3977,13 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         }
     }
 
+    private PTable getTableFromCacheWithModifiedIndexState(long 
clientTimeStamp, int clientVersion,
+        ImmutableBytesPtr cacheKey) throws SQLException {
+        PTable table = getTableFromCache(cacheKey, clientTimeStamp, 
clientVersion);
+        table = modifyIndexStateForOldClient(clientVersion, table);
+        return table;
+    }
+
     private List<PFunction> doGetFunctions(List<byte[]> keys, long 
clientTimeStamp) throws IOException, SQLException {
         Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();

Reply via email to