This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new a96449825e PHOENIX-7363 Protect server side metadata cache updates for the given PTable (#1943) a96449825e is described below commit a96449825ec4d685de11f410933afaad35db7fe3 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Sun Jul 28 09:51:26 2024 -0800 PHOENIX-7363 Protect server side metadata cache updates for the given PTable (#1943) --- .../org/apache/phoenix/query/QueryServices.java | 9 ++++ .../phoenix/coprocessor/MetaDataEndpointImpl.java | 52 +++++++++++++++++----- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index f4f0574a1d..1a42cf5d3b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -510,6 +510,15 @@ public interface QueryServices extends SQLCloseable { String PHOENIX_GET_METADATA_READ_LOCK_ENABLED = "phoenix.get.metadata.read.lock.enabled"; + /** + * If server side metadata cache is empty, take Phoenix writeLock for the given row + * and make sure we can acquire the writeLock within the configurable duration. + */ + String PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT = + "phoenix.metadata.update.rowlock.timeout"; + + long DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT = 60000; + /** * Get executor service used for parallel scans */ diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 4048086022..b54fd8c1e9 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -195,6 +195,7 @@ import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; +import org.apache.phoenix.hbase.index.LockManager; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -326,6 +327,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr private static final byte[] PHYSICAL_TABLE_BYTES = new byte[]{PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()}; + private LockManager lockManager; + private long metadataCacheRowLockTimeout; + // KeyValues for Table private static final Cell TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES); @@ -642,8 +646,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); throw new CoprocessorException("Must be loaded on a table region!"); } + this.lockManager = new LockManager(); phoenixAccessCoprocessorHost = new PhoenixMetaDataCoprocessorHost(this.env); Configuration config = env.getConfiguration(); + this.metadataCacheRowLockTimeout = + config.getLong(QueryServices.PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT, + QueryServices.DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT); this.accessCheckEnabled = config.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED, QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED); this.blockWriteRebuildIndex = config.getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, @@ -765,6 +773,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); PTable newTable; + region.startRegionOperation(); try (RegionScanner scanner = region.getScanner(scan)) { PTable oldTable = (PTable) metaDataCache.getIfPresent(cacheKey); long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP - 1 : oldTable.getTimeStamp(); @@ -782,6 +791,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } metaDataCache.put(cacheKey, newTable); } + } finally { + region.closeRegionOperation(); } return newTable; } @@ -3920,8 +3931,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); rowLock = acquireLock(region, key, null, true); } PTable table = - getTableFromCache(cacheKey, clientTimeStamp, clientVersion); - table = modifyIndexStateForOldClient(clientVersion, table); + getTableFromCacheWithModifiedIndexState(clientTimeStamp, clientVersion, cacheKey); // We only cache the latest, so we'll end up building the table with every call if the // client connection has specified an SCN. // TODO: If we indicate to the client that we're returning an older version, but there's a @@ -3934,17 +3944,32 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } return table; } - // Query for the latest table first, since it's not cached - table = + // take Phoenix row level write-lock as we need to protect metadata cache update + // after scanning SYSTEM.CATALOG to retrieve the PTable object + LockManager.RowLock phoenixRowLock = + lockManager.lockRow(key, this.metadataCacheRowLockTimeout); + try { + table = getTableFromCacheWithModifiedIndexState(clientTimeStamp, clientVersion, + cacheKey); + if (table != null && table.getTimeStamp() < clientTimeStamp) { + if (isTableDeleted(table)) { + return null; + } + return table; + } + // Query for the latest table first, since it's not cached + table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion); - if ((table != null && table.getTimeStamp() <= clientTimeStamp) || - (blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) { + if ((table != null && table.getTimeStamp() <= clientTimeStamp) || ( + blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) { + return table; + } + // Otherwise, query for an older version of the table - it won't be cached + table = buildTable(key, cacheKey, region, clientTimeStamp, clientVersion); return table; + } finally { + phoenixRowLock.release(); } - // Otherwise, query for an older version of the table - it won't be cached - table = - buildTable(key, cacheKey, region, clientTimeStamp, clientVersion); - return table; } finally { if (!wasLocked && rowLock != null) { rowLock.release(); @@ -3952,6 +3977,13 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } } + private PTable getTableFromCacheWithModifiedIndexState(long clientTimeStamp, int clientVersion, + ImmutableBytesPtr cacheKey) throws SQLException { + PTable table = getTableFromCache(cacheKey, clientTimeStamp, clientVersion); + table = modifyIndexStateForOldClient(clientVersion, table); + return table; + } + private List<PFunction> doGetFunctions(List<byte[]> keys, long clientTimeStamp) throws IOException, SQLException { Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();