This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 67ffa6a PHOENIX-6298 : Use timestamp of PENDING_DISABLE_COUNT to calculate elapse time for PENDING_DISABLE state 67ffa6a is described below commit 67ffa6a347f222cd4f297b3ef36abd691cd2e76f Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Tue Jan 5 16:28:04 2021 +0530 PHOENIX-6298 : Use timestamp of PENDING_DISABLE_COUNT to calculate elapse time for PENDING_DISABLE state --- .../end2end/index/PartialIndexRebuilderIT.java | 84 ++++++++++++++++++++-- .../coprocessor/MetaDataRegionObserver.java | 16 +++-- .../java/org/apache/phoenix/util/IndexUtil.java | 21 ++++++ 3 files changed, 109 insertions(+), 12 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index d0811ed..bc9bcfe 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.end2end.index; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -30,6 +31,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -63,6 +65,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexScrutiny; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.Repeat; import org.apache.phoenix.util.SchemaUtil; @@ -83,9 +86,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { private static final long REBUILD_PERIOD = 50000; private static final long REBUILD_INTERVAL = 2000; private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; - private static Boolean runRebuildOnce = true; - @BeforeClass public static synchronized void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); @@ -128,8 +129,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, String table) { runIndexRebuilderAsync(interval, cancel, Collections.<String>singletonList(table)); } + private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, final List<String> tables) { - runRebuildOnce = true; Thread thread = new Thread(new Runnable() { @Override public void run() { @@ -142,8 +143,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { throw new RuntimeException(e); } catch (SQLException e) { LOGGER.error(e.getMessage(),e); - } finally { - runRebuildOnce = false; } } } @@ -561,7 +560,13 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { @Override public long currentTime() { - return time++; + synchronized (this) { + return time++; + } + } + + private synchronized void addTime(long diff) { + time += diff; } } @@ -1088,4 +1093,71 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString()); } } + + @Test + public void testPendingDisableWithDisableCountTs() throws Throwable { + final String schemaName = generateUniqueName(); + final String tableName = generateUniqueName(); + final String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + final MyClock clock = + new MyClock(EnvironmentEdgeManager.currentTimeMillis()); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute(String.format( + "CREATE TABLE %s (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, " + + "v3 VARCHAR, v4 VARCHAR) COLUMN_ENCODED_BYTES = 0, " + + "DISABLE_INDEX_ON_WRITE_FAILURE = TRUE", fullTableName)); + EnvironmentEdgeManager.injectEdge(clock); + clock.addTime(100); + conn.createStatement().execute( + String.format("CREATE INDEX %s ON %s (v1, v2)", indexName, + fullTableName)); + clock.addTime(100); + conn.createStatement().execute( + String.format("UPSERT INTO %s VALUES('k01', 'v01', 'v02', 'v03', 'v04')", + fullTableName)); + conn.commit(); + clock.addTime(100); + + try (HTableInterface systemCatalog = conn.unwrap(PhoenixConnection.class) + .getQueryServices() + .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { + IndexUtil.updateIndexState(fullIndexName, clock.currentTime(), + systemCatalog, PIndexState.PENDING_DISABLE); + } + + Configuration conf = + conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); + + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + ResultSet rs = stmt.executeQuery( + String.format("SELECT V2 FROM %s WHERE V1 = 'v01'", fullTableName)); + assertTrue(rs.next()); + assertEquals("v02", rs.getString(1)); + + long pendingDisableThreshold = conf.getLong( + QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, + QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); + long pendingDisableCountLastUpdatedTs = + IndexUtil.getIndexPendingDisableCountLastUpdatedTimestamp( + conn.unwrap(PhoenixConnection.class), fullIndexName); + clock.addTime(pendingDisableThreshold + pendingDisableCountLastUpdatedTs); + + stmt = conn.createStatement().unwrap(PhoenixStatement.class); + rs = stmt.executeQuery( + String.format("SELECT V2 FROM %s WHERE V1 = 'v01'", fullTableName)); + assertTrue(rs.next()); + assertEquals("v02", rs.getString(1)); + + Thread.sleep(1000); + waitForIndexState(conn, fullTableName, fullIndexName, + PIndexState.DISABLE); + } finally { + EnvironmentEdgeManager.reset(); + } + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index b4e2b84..4058dde 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -314,9 +314,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { continue; } - long indexDisableTimestamp = - PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, - SortOrder.ASC); byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); if ((dataTable == null || dataTable.length == 0) || indexStateCell == null) { @@ -361,14 +358,21 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]); - long elapsedSinceDisable = EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp); + long pendingDisableCountLastUpdatedTs = + IndexUtil.getIndexPendingDisableCountLastUpdatedTimestamp(conn, indexTableFullName); + long elapsedSinceDisable = + EnvironmentEdgeManager.currentTimeMillis() - pendingDisableCountLastUpdatedTs; // on an index write failure, the server side transitions to PENDING_DISABLE, then the client // retries, and after retries are exhausted, disables the index if (indexState == PIndexState.PENDING_DISABLE) { if (elapsedSinceDisable > pendingDisableThreshold) { - // too long in PENDING_DISABLE - client didn't disable the index, so we do it here - IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, indexDisableTimestamp); + // too long in PENDING_DISABLE - + // client didn't disable the index because last time when + // PENDING_DISABLE_COUNT was updated is greater than pendingDisableThreshold, + // so we do it here + IndexUtil.updateIndexState(conn, indexTableFullName, + PIndexState.DISABLE, pendingDisableCountLastUpdatedTs); } continue; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 6e9e8f0..ec1a4f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -913,4 +913,25 @@ public class IndexUtil { throw new IOException(e); } } + + public static long getIndexPendingDisableCountLastUpdatedTimestamp( + PhoenixConnection conn, String failedIndexTable) + throws IOException { + byte[] indexTableKey = + SchemaUtil.getTableKeyFromFullName(failedIndexTable); + Get get = new Get(indexTableKey); + get.addColumn(TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES); + byte[] systemCatalog = SchemaUtil.getPhysicalTableName( + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, + conn.getQueryServices().getProps()).getName(); + try (Table table = conn.getQueryServices().getTable(systemCatalog)) { + Result result = table.get(get); + Cell cell = result.listCells().get(0); + return cell.getTimestamp(); + } catch (SQLException e) { + throw new IOException(e); + } + } + }