Repository: phoenix Updated Branches: refs/heads/master 0a3ef6c1b -> c8cbb5e5e
PHOENIX-3496 Figure out why LocalIndexIT#testLocalIndexRoundTrip is flapping(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c8cbb5e5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c8cbb5e5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c8cbb5e5 Branch: refs/heads/master Commit: c8cbb5e5e196299d5cc50385bd5ebb3791170d2f Parents: 0a3ef6c Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Tue Sep 5 23:34:57 2017 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Tue Sep 5 23:34:57 2017 +0530 ---------------------------------------------------------------------- .../phoenix/end2end/FlappingLocalIndexIT.java | 79 +++++++++++++++++++- .../phoenix/end2end/index/BaseLocalIndexIT.java | 6 +- .../phoenix/end2end/index/LocalIndexIT.java | 3 +- .../UngroupedAggregateRegionObserver.java | 42 ++++++++++- .../phoenix/iterate/BaseResultIterators.java | 50 +++++++++---- 5 files changed, 159 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java index 7509997..e2f3970 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java @@ -21,22 +21,31 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.end2end.index.BaseLocalIndexIT; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; @@ -297,4 +306,72 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT { indexTable.close(); } -} + @Test + public void testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException() throws Exception { + testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(false); + } + + @Test + public void testBuildingLocalCoveredIndexShouldHandleNoSuchColumnFamilyException() throws Exception { + testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(true); + } + + private void testBuildingLocalIndexShouldHandleNoSuchColumnFamilyException(boolean coveredIndex) throws Exception { + String tableName = schemaName + "." + generateUniqueName(); + String indexName = "IDX_" + generateUniqueName(); + String indexTableName = schemaName + "." + indexName; + TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped); + + createBaseTable(tableName, null, null, coveredIndex ? "cf" : null); + Connection conn1 = DriverManager.getConnection(getUrl()); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')"); + conn1.commit(); + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + HTableDescriptor tableDescriptor = admin.getTableDescriptor(physicalTableName); + tableDescriptor.addCoprocessor(DeleyOpenRegionObserver.class.getName(), null, + QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY - 1, null); + admin.disableTable(physicalTableName); + admin.modifyTable(physicalTableName, tableDescriptor); + admin.enableTable(physicalTableName); + DeleyOpenRegionObserver.DELAY_OPEN = true; + conn1.createStatement().execute( + "CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(k3)" + + (coveredIndex ? " include(cf.v1)" : "")); + DeleyOpenRegionObserver.DELAY_OPEN = false; + ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexTableName); + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + } + + public static class DeleyOpenRegionObserver extends BaseRegionObserver { + public static volatile boolean DELAY_OPEN = false; + private int retryCount = 0; + private CountDownLatch latch = new CountDownLatch(1); + @Override + public void + preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) + throws IOException { + if(DELAY_OPEN) { + try { + latch.await(); + } catch (InterruptedException e1) { + throw new DoNotRetryIOException(e1); + } + } + super.preClose(c, abortRequested); + } + + @Override + public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, + Scan scan, RegionScanner s) throws IOException { + if(DELAY_OPEN && retryCount == 1) { + latch.countDown(); + } + retryCount++; + return super.preScannerOpen(e, scan, s); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java index 547878c..30baec4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java @@ -69,6 +69,10 @@ public abstract class BaseLocalIndexIT extends BaseUniqueNamesOwnClusterIT { } protected void createBaseTable(String tableName, Integer saltBuckets, String splits) throws SQLException { + createBaseTable(tableName, saltBuckets, splits, null); + } + + protected void createBaseTable(String tableName, Integer saltBuckets, String splits, String cf) throws SQLException { Connection conn = getConnection(); if (isNamespaceMapped) { conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); @@ -77,7 +81,7 @@ public abstract class BaseLocalIndexIT extends BaseUniqueNamesOwnClusterIT { "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + - "v1 VARCHAR,\n" + + (cf != null ? (cf+'.') : "") + "v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n" + (saltBuckets != null && splits == null ? (" salt_buckets=" + saltBuckets) : "" + (saltBuckets == null && splits != null ? (" split on " + splits) : "")); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index 27edfb7..48221ab 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -75,8 +75,7 @@ public class LocalIndexIT extends BaseLocalIndexIT { super(isNamespaceMapped); } - @Ignore - //FIXME: PHOENIX-3496 + @Test public void testLocalIndexRoundTrip() throws Exception { String tableName = schemaName + "." + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index afe0ccf..31b8e36 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -90,6 +92,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; @@ -121,6 +124,7 @@ import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TimeKeeper; @@ -276,10 +280,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver throws IOException { s = super.preScannerOpen(e, scan, s); if (ScanUtil.isAnalyzeTable(scan)) { -// if (!ScanUtil.isLocalIndex(scan)) { -// scan.getFamilyMap().clear(); -// } -// scan.getFamilyMap().clear(); // We are setting the start row and stop row such that it covers the entire region. As part // of Phonenix-1263 we are storing the guideposts against the physical table rather than // individual tenant specific tables. @@ -448,6 +448,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1) ; boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; + if(buildLocalIndex) { + checkForLocalIndexColumnFamilies(region, indexMaintainers); + } if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { needToWrite = true; maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); @@ -791,6 +794,37 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } + private void checkForLocalIndexColumnFamilies(Region region, + List<IndexMaintainer> indexMaintainers) throws IOException { + HTableDescriptor tableDesc = region.getTableDesc(); + String schemaName = + tableDesc.getTableName().getNamespaceAsString() + .equals(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR) ? SchemaUtil + .getSchemaNameFromFullName(tableDesc.getTableName().getNameAsString()) + : tableDesc.getTableName().getNamespaceAsString(); + String tableName = SchemaUtil.getTableNameFromFullName(tableDesc.getTableName().getNameAsString()); + for (IndexMaintainer indexMaintainer : indexMaintainers) { + Set<ColumnReference> coveredColumns = indexMaintainer.getCoveredColumns(); + if(coveredColumns.isEmpty()) { + byte[] localIndexCf = indexMaintainer.getEmptyKeyValueFamily().get(); + // When covered columns empty we store index data in default column family so check for it. + if (tableDesc.getFamily(localIndexCf) == null) { + ServerUtil.throwIOException("Column Family Not Found", + new ColumnFamilyNotFoundException(schemaName, tableName, Bytes + .toString(localIndexCf))); + } + } + for (ColumnReference reference : coveredColumns) { + byte[] cf = IndexUtil.getLocalIndexColumnFamily(reference.getFamily()); + HColumnDescriptor family = region.getTableDesc().getFamily(cf); + if (family == null) { + ServerUtil.throwIOException("Column Family Not Found", + new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf))); + } + } + } + } + private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize, byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto, boolean isPKChanging) http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8cbb5e5/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 6ab5dc3..98f5d46 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.iterate; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; @@ -86,6 +87,7 @@ import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; @@ -899,24 +901,23 @@ public abstract class BaseResultIterators extends ExplainTable implements Result try { // Rethrow as SQLException throw ServerUtil.parseServerException(e); } catch (StaleRegionBoundaryCacheException e2) { - scanPairItr.remove(); // Catch only to try to recover from region boundary cache being out of date if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries services.clearTableRegionCache(physicalTableName); context.getOverallQueryMetrics().cacheRefreshedDueToSplits(); } - // Resubmit just this portion of work again - Scan oldScan = scanPair.getFirst(); - byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW); - byte[] endKey = oldScan.getStopRow(); - - List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey); - // Add any concatIterators that were successful so far - // as we need these to be in order - addIterator(iterators, concatIterators); - concatIterators = Lists.newArrayList(); - getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse, - maxQueryEndTime, newNestedScans.size(), previousScan); + concatIterators = + recreateIterators(services, isLocalIndex, allIterators, + iterators, isReverse, maxQueryEndTime, previousScan, + clearedCache, concatIterators, scanPairItr, scanPair); + } catch(ColumnFamilyNotFoundException cfnfe) { + if (scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) { + Thread.sleep(1000); + concatIterators = + recreateIterators(services, isLocalIndex, allIterators, + iterators, isReverse, maxQueryEndTime, previousScan, + clearedCache, concatIterators, scanPairItr, scanPair); + } } } } @@ -968,6 +969,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } return null; // Not reachable } + + private List<PeekingResultIterator> recreateIterators(ConnectionQueryServices services, + boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, + List<PeekingResultIterator> iterators, boolean isReverse, long maxQueryEndTime, + ScanWrapper previousScan, boolean clearedCache, + List<PeekingResultIterator> concatIterators, + Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr, + Pair<Scan, Future<PeekingResultIterator>> scanPair) throws SQLException { + scanPairItr.remove(); + // Resubmit just this portion of work again + Scan oldScan = scanPair.getFirst(); + byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW); + byte[] endKey = oldScan.getStopRow(); + + List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey); + // Add any concatIterators that were successful so far + // as we need these to be in order + addIterator(iterators, concatIterators); + concatIterators = Lists.newArrayList(); + getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse, + maxQueryEndTime, newNestedScans.size(), previousScan); + return concatIterators; + } @Override