Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 0ae4765ec -> bf4262a99
PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP and disable index on compaction (addendum) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bf4262a9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bf4262a9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bf4262a9 Branch: refs/heads/4.x-HBase-0.98 Commit: bf4262a991cfa8d369b3669d1086c13d5276dfbc Parents: 0ae4765 Author: James Taylor <jtay...@salesforce.com> Authored: Tue Sep 5 13:05:17 2017 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Tue Sep 5 14:21:59 2017 -0700 ---------------------------------------------------------------------- .../end2end/index/PartialIndexRebuilderIT.java | 21 +++-- .../UngroupedAggregateRegionObserver.java | 2 +- .../stats/DefaultStatisticsCollector.java | 83 ++++++++++++++------ .../schema/stats/NoOpStatisticsCollector.java | 2 +- .../schema/stats/StatisticsCollector.java | 2 +- 5 files changed, 75 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf4262a9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 9483e87..139725f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -248,29 +248,34 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { public void testCompactionDuringRebuild() throws Throwable { String schemaName = generateUniqueName(); String tableName = generateUniqueName(); - String indexName = generateUniqueName(); + String indexName1 = generateUniqueName(); + String indexName2 = generateUniqueName(); final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + String fullIndexName1 = SchemaUtil.getTableName(schemaName, indexName1); + String fullIndexName2 = SchemaUtil.getTableName(schemaName, indexName2); final MyClock clock = new MyClock(1000); // Use our own clock to prevent race between partial rebuilder and compaction EnvironmentEdgeManager.injectEdge(clock); try (Connection conn = DriverManager.getConnection(getUrl())) { conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true, GUIDE_POSTS_WIDTH=1000"); clock.time += 1000; - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + conn.createStatement().execute("CREATE INDEX " + indexName1 + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + clock.time += 1000; + conn.createStatement().execute("CREATE INDEX " + indexName2 + " ON " + fullTableName + " (v2) INCLUDE (v1)"); clock.time += 1000; conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(1, 2, 3)"); conn.commit(); clock.time += 1000; long disableTS = EnvironmentEdgeManager.currentTimeMillis(); HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); - IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE); - TestUtil.doMajorCompaction(conn, fullIndexName); - assertFalse(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, 0L)); + IndexUtil.updateIndexState(fullIndexName1, disableTS, metaTable, PIndexState.DISABLE); + IndexUtil.updateIndexState(fullIndexName2, disableTS, metaTable, PIndexState.DISABLE); + TestUtil.doMajorCompaction(conn, fullIndexName1); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName1, PIndexState.DISABLE, 0L)); TestUtil.analyzeTable(conn, fullTableName); - assertFalse(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, 0L)); + assertFalse(TestUtil.checkIndexState(conn, fullIndexName2, PIndexState.DISABLE, 0L)); TestUtil.doMajorCompaction(conn, fullTableName); - assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, 0L)); + assertTrue(TestUtil.checkIndexState(conn, fullIndexName2, PIndexState.DISABLE, 0L)); } finally { EnvironmentEdgeManager.injectEdge(null); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf4262a9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index acf1e17..eef023e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -1119,7 +1119,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver long rowCount = 0; try { if (!compactionRunning) { - stats.init(); + stats.init(false); synchronized (innerScanner) { do { List<Cell> results = new ArrayList<Cell>(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf4262a9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java index dc6dea3..2320fc0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -41,13 +42,16 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TimeKeeper; @@ -109,9 +113,10 @@ public class DefaultStatisticsCollector implements StatisticsCollector { } } - private void initGuidepostDepth() throws IOException { + private void initGuidepostDepth(boolean isMajorCompaction) throws IOException { // First check is if guidepost info set on statement itself - if (guidePostPerRegionBytes != null || guidePostWidthBytes != null) { + boolean guidepostOnStatement = guidePostPerRegionBytes != null || guidePostWidthBytes != null; + if (guidepostOnStatement) { int guidepostPerRegion = 0; long guidepostWidth = QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES; if (guidePostPerRegionBytes != null) { @@ -122,20 +127,48 @@ public class DefaultStatisticsCollector implements StatisticsCollector { } this.guidePostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, env.getRegion().getTableDesc()); - } else { + } + + if (!guidepostOnStatement || isMajorCompaction) { long guidepostWidth = -1; HTableInterface htable = null; try { - // Next check for GUIDE_POST_WIDTH on table - htable = env.getTable( - SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration())); + // Next check for GUIDE_POST_WIDTH and INDEX_DISABLE_TIMESTAMP on table + TableName htableName = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()); + htable = env.getTable(htableName); Get get = new Get(ptableKey); get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES); + get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); Result result = htable.get(get); if (!result.isEmpty()) { - Cell cell = result.listCells().get(0); - guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault()); + Cell gpwCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES); + if (gpwCell != null) { + guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(gpwCell.getValueArray(), gpwCell.getValueOffset(), SortOrder.getDefault()); + } + if (isMajorCompaction) { + Cell idtsCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); + if (idtsCell != null) { + long indexDisableTimestamp = PLong.INSTANCE.getCodec().decodeLong(idtsCell.getValueArray(), idtsCell.getValueOffset(), SortOrder.getDefault()); + // If we have a non zero value for INDEX_DISABLE_TIMESTAMP, that means that our global mutable + // secondary index needs to be partially rebuilt. If we're compacting, though, we may cleanup + // the delete markers of an index *before* the puts for the same row occur during replay. At + // this point the partially index rebuild would leave the index out of sync with the data + // table. In that case, it's better to just permanently disable the index and force it to be + // manually rebuilt + if (indexDisableTimestamp != 0) { + MutationCode mutationCode = IndexUtil.updateIndexState(ptableKey, 0L, htable, PIndexState.DISABLE).getMutationCode(); + if (mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) { + LOG.warn("Attempt to permanently disable index " + env.getRegionInfo().getTable().getNameAsString() + + " during compaction failed with code = " + mutationCode); + } + } + } + } } + } catch (IOException e) { + throw e; + } catch (Throwable t) { + throw new IOException(t); } finally { if (htable != null) { try { @@ -145,19 +178,21 @@ public class DefaultStatisticsCollector implements StatisticsCollector { } } } - if (guidepostWidth >= 0) { - this.guidePostDepth = guidepostWidth; - } else { - // Last use global config value - Configuration config = env.getConfiguration(); - this.guidePostDepth = StatisticsUtil.getGuidePostDepth( - config.getInt( - QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION), - config.getLong( - QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES), - env.getRegion().getTableDesc()); + if (!guidepostOnStatement) { + if (guidepostWidth >= 0) { + this.guidePostDepth = guidepostWidth; + } else { + // Last use global config value + Configuration config = env.getConfiguration(); + this.guidePostDepth = StatisticsUtil.getGuidePostDepth( + config.getInt( + QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION), + config.getLong( + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES), + env.getRegion().getTableDesc()); + } } } } @@ -315,13 +350,13 @@ public class DefaultStatisticsCollector implements StatisticsCollector { StatisticsScanner scanner = new StatisticsScanner(this, statsWriter, env, internalScan, family); // We need to initialize the scanner synchronously and potentially perform a cross region Get // in order to use the correct guide posts width for the table being compacted. - init(); + init(true); return scanner; } @Override - public void init() throws IOException { - initGuidepostDepth(); + public void init(boolean isMajorCompaction) throws IOException { + initGuidepostDepth(isMajorCompaction); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf4262a9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java index 02f1061..e4c3785 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java @@ -61,7 +61,7 @@ public class NoOpStatisticsCollector implements StatisticsCollector { } @Override - public void init() { + public void init(boolean isMajorCompaction) { // No-op } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf4262a9/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index c4a3059..7e6c1cb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -64,7 +64,7 @@ public interface StatisticsCollector extends Closeable { * Called before beginning the collection of statistics through {@link #collectStatistics(List)} * @throws IOException */ - void init() throws IOException; + void init(boolean isMajorCompaction) throws IOException; /** * Retrieve the calculated guide post info for the given column family.