This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 5.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push: new c618921f3a PHOENIX-7306 Metadata lookup should be permitted only within query timeout (#1880) c618921f3a is described below commit c618921f3a67bb98b17169631eee564e3e6afc80 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Sat May 4 14:33:10 2024 -0800 PHOENIX-7306 Metadata lookup should be permitted only within query timeout (#1880) --- .../apache/phoenix/cache/ServerCacheClient.java | 16 ++++- .../iterate/DefaultParallelScanGrouper.java | 6 +- .../phoenix/query/ConnectionQueryServices.java | 45 +++++++++++- .../phoenix/query/ConnectionQueryServicesImpl.java | 82 +++++++++++++++++++--- .../query/ConnectionlessQueryServicesImpl.java | 29 +++++++- .../query/DelegateConnectionQueryServices.java | 24 ++++++- .../org/apache/phoenix/query/QueryServices.java | 2 +- .../iterate/MapReduceParallelScanGrouper.java | 6 +- .../org/apache/phoenix/end2end/MapReduceIT.java | 8 ++- .../end2end/SkipScanAfterManualSplitIT.java | 10 ++- .../phoenix/iterate/PhoenixQueryTimeoutIT.java | 21 +++++- .../iterate/RoundRobinResultIteratorIT.java | 10 ++- .../monitoring/PhoenixTableLevelMetricsIT.java | 46 +++++++++--- .../phoenix/schema/stats/BaseStatsCollectorIT.java | 9 +-- .../phoenix/cache/ServerCacheClientTest.java | 7 +- .../apache/phoenix/compile/QueryCompilerTest.java | 4 +- .../query/ConnectionQueryServicesImplTest.java | 15 ++-- .../phoenix/query/ParallelIteratorsSplitTest.java | 5 +- 18 files changed, 289 insertions(+), 56 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 6700ce7c50..9c6bb11695 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -225,7 +225,8 @@ public class ServerCacheClient { PTable cacheUsingTable = delegate.getTableRef().getTable(); ConnectionQueryServices services = delegate.getContext().getConnection().getQueryServices(); List<HRegionLocation> locations = services.getAllTableRegions( - cacheUsingTable.getPhysicalName().getBytes()); + cacheUsingTable.getPhysicalName().getBytes(), + delegate.getContext().getStatement().getQueryTimeoutInMillis()); int nRegions = locations.size(); Set<HRegionLocation> servers = new HashSet<>(nRegions); cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable); @@ -267,7 +268,12 @@ public class ServerCacheClient { ExecutorService executor = services.getExecutor(); List<Future<Boolean>> futures = Collections.emptyList(); try { - List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes()); + int queryTimeout = connection.getQueryServices().getProps() + .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); + List<HRegionLocation> locations = + services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes(), + queryTimeout); int nRegions = locations.size(); // Size these based on worst case futures = new ArrayList<Future<Boolean>>(nRegions); @@ -379,7 +385,11 @@ public class ServerCacheClient { byte[] tableName = cacheUsingTable.getPhysicalName().getBytes(); iterateOverTable = services.getTable(tableName); - List<HRegionLocation> locations = services.getAllTableRegions(tableName); + int queryTimeout = connection.getQueryServices().getProps() + .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); + List<HRegionLocation> locations = services.getAllTableRegions(tableName, queryTimeout); + /** * Allow for the possibility that the region we based where to send our cache has split and been relocated * to another region server *after* we sent it, but before we removed it. To accommodate this, we iterate diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java index 5252be410e..23ea797486 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/DefaultParallelScanGrouper.java @@ -71,7 +71,8 @@ public class DefaultParallelScanGrouper implements ParallelScanGrouper { @Override public List<HRegionLocation> getRegionBoundaries(StatementContext context, byte[] tableName) throws SQLException { - return context.getConnection().getQueryServices().getAllTableRegions(tableName); + return context.getConnection().getQueryServices().getAllTableRegions(tableName, + context.getStatement().getQueryTimeoutInMillis()); } /** @@ -82,6 +83,7 @@ public class DefaultParallelScanGrouper implements ParallelScanGrouper { byte[] tableName, byte[] startRegionBoundaryKey, byte[] stopRegionBoundaryKey) throws SQLException { return context.getConnection().getQueryServices() - .getTableRegions(tableName, startRegionBoundaryKey, stopRegionBoundaryKey); + .getTableRegions(tableName, startRegionBoundaryKey, stopRegionBoundaryKey, + context.getStatement().getQueryTimeoutInMillis()); } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index ffb5859431..95ea0bab42 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -99,13 +99,37 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public TableDescriptor getTableDescriptor(byte[] tableName) throws SQLException; public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException; + + /** + * Retrieve the region metadata locations for all regions of the given table. + * This method is Deprecated. Use {@link #getAllTableRegions(byte[], int)} instead. + * + * @param tableName The table name. + * @return The list of table region locations. + * @throws SQLException If fails to retrieve region locations. + */ + @Deprecated public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException; + /** + * Retrieve the region metadata locations for all regions of the given table. + * The operation to retrieve the table region locations must be completed within + * the query timeout. + * + * @param tableName Table name. + * @param queryTimeout Phoenix query timeout. + * @return The list of region locations. + * @throws SQLException If fails to retrieve region locations. + */ + public List<HRegionLocation> getAllTableRegions(byte[] tableName, int queryTimeout) + throws SQLException; + /** * Retrieve table region locations that cover the startRowKey and endRowKey. The start key * of the first region of the returned list must be less than or equal to startRowKey. * The end key of the last region of the returned list must be greater than or equal to * endRowKey. + * This method is Deprecated. Use {@link #getTableRegions(byte[], byte[], byte[], int)} instead. * * @param tableName Table name. * @param startRowKey Start RowKey. @@ -113,8 +137,27 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated * @return The list of region locations that cover the startRowKey and endRowKey key boundary. * @throws SQLException If fails to retrieve region locations. */ + @Deprecated + public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, + byte[] endRowKey) throws SQLException; + + /** + * Retrieve table region locations that cover the startRowKey and endRowKey. The start key + * of the first region of the returned list must be less than or equal to startRowKey. + * The end key of the last region of the returned list must be greater than or equal to + * endRowKey. The operation to retrieve the table region locations must be completed within + * the query timeout. + * + * @param tableName Table name. + * @param startRowKey Start RowKey. + * @param endRowKey End RowKey. + * @param queryTimeout Phoenix query timeout. + * @return The list of region locations that cover the startRowKey and endRowKey key boundary. + * @throws SQLException If fails to retrieve region locations. + */ public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, - byte[] endRowKey) throws SQLException; + byte[] endRowKey, + int queryTimeout) throws SQLException; public PhoenixConnection connect(String url, Properties info) throws SQLException; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index df4aaf5d05..b05d352bc8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -93,6 +93,7 @@ import java.nio.charset.StandardCharsets; import java.sql.PreparedStatement; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.SQLTimeoutException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; @@ -708,7 +709,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } public byte[] getNextRegionStartKey(HRegionLocation regionLocation, byte[] currentKey, - HRegionLocation prevRegionLocation) throws IOException { + HRegionLocation prevRegionLocation) { // in order to check the overlap/inconsistencies bad region info, we have to make sure // the current endKey always increasing(compare the previous endKey) @@ -733,9 +734,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement && !Bytes.equals(regionLocation.getRegion().getEndKey(), HConstants.EMPTY_END_ROW); if (conditionOne || conditionTwo) { GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY.increment(); - String regionNameString = - new String(regionLocation.getRegion().getRegionName(), StandardCharsets.UTF_8); - LOGGER.error( + LOGGER.warn( "HBase region overlap/inconsistencies on {} , current key: {} , region startKey:" + " {} , region endKey: {} , prev region startKey: {} , prev region endKey: {}", regionLocation, @@ -746,17 +745,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement "null" : Bytes.toStringBinary(prevRegionLocation.getRegion().getStartKey()), prevRegionLocation == null ? "null" : Bytes.toStringBinary(prevRegionLocation.getRegion().getEndKey())); - throw new IOException( - String.format("HBase region information overlap/inconsistencies on region %s", - regionNameString)); } return regionLocation.getRegion().getEndKey(); } + /** + * {@inheritDoc}. + */ @Override public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { + int queryTimeout = this.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); + return getTableRegions(tableName, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, queryTimeout); + } + + /** + * {@inheritDoc}. + */ + @Override + public List<HRegionLocation> getAllTableRegions(byte[] tableName, int queryTimeout) + throws SQLException { return getTableRegions(tableName, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); + HConstants.EMPTY_END_ROW, queryTimeout); } /** @@ -764,7 +775,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement */ @Override public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, - byte[] endRowKey) throws SQLException { + byte[] endRowKey) throws SQLException{ + int queryTimeout = this.getProps().getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); + return getTableRegions(tableName, startRowKey, endRowKey, queryTimeout); + } + + /** + * {@inheritDoc}. + */ + @Override + public List<HRegionLocation> getTableRegions(final byte[] tableName, final byte[] startRowKey, + final byte[] endRowKey, final int queryTimeout) + throws SQLException { /* * Use HConnection.getRegionLocation as it uses the cache in HConnection, while getting * all region locations from the HTable doesn't. @@ -774,6 +797,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement config.getInt(PHOENIX_GET_REGIONS_RETRIES, DEFAULT_PHOENIX_GET_REGIONS_RETRIES); TableName table = TableName.valueOf(tableName); byte[] currentKey = null; + final long startTime = EnvironmentEdgeManager.currentTimeMillis(); + final long maxQueryEndTime = startTime + queryTimeout; while (true) { try { // We could surface the package projected HConnectionImplementation.getNumberOfCachedRegionLocations @@ -794,6 +819,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement && Bytes.compareTo(currentKey, endRowKey) >= 0) { break; } + throwErrorIfQueryTimedOut(startRowKey, endRowKey, maxQueryEndTime, + queryTimeout, table, retryCount, currentKey); } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)); return locations; } catch (org.apache.hadoop.hbase.TableNotFoundException e) { @@ -818,6 +845,43 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + /** + * Throw Error if the metadata lookup takes longer than query timeout configured. + * + * @param startRowKey Start RowKey to begin the region metadata lookup from. + * @param endRowKey End RowKey to end the region metadata lookup at. + * @param maxQueryEndTime Max time to execute the metadata lookup. + * @param queryTimeout Query timeout. + * @param table Table Name. + * @param retryCount Retry Count. + * @param currentKey Current Key. + * @throws SQLException Throw Error if the metadata lookup takes longer than query timeout. + */ + private static void throwErrorIfQueryTimedOut(byte[] startRowKey, byte[] endRowKey, + long maxQueryEndTime, + int queryTimeout, TableName table, int retryCount, + byte[] currentKey) throws SQLException { + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); + if (currentTime >= maxQueryEndTime) { + LOGGER.error("getTableRegions has exceeded query timeout {} ms." + + "Table: {}, retryCount: {} , currentKey: {} , " + + "startRowKey: {} , endRowKey: {}", + queryTimeout, + table.getNameAsString(), + retryCount, + Bytes.toStringBinary(currentKey), + Bytes.toStringBinary(startRowKey), + Bytes.toStringBinary(endRowKey) + ); + final String message = "getTableRegions has exceeded query timeout " + queryTimeout + + "ms"; + IOException e = new IOException(message); + throw new SQLTimeoutException(message, + SQLExceptionCode.OPERATION_TIMED_OUT.getSQLState(), + SQLExceptionCode.OPERATION_TIMED_OUT.getErrorCode(), e); + } + } + public PMetaData getMetaDataCache() { return latestMetaData; } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 470a44fb76..f0e5277090 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -223,9 +223,23 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple throw new UnsupportedOperationException(); } + /** + * {@inheritDoc}. + */ @Override public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { - return getTableRegions(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + return getTableRegions(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); + } + + /** + * {@inheritDoc}. + */ + @Override + public List<HRegionLocation> getAllTableRegions(byte[] tableName, int queryTimeout) + throws SQLException { + return getTableRegions(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + queryTimeout); } /** @@ -233,7 +247,18 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple */ @Override public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, - byte[] endRowKey) throws SQLException { + byte[] endRowKey) throws SQLException { + return getTableRegions(tableName, startRowKey, endRowKey, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); + } + + /** + * {@inheritDoc}. + */ + @Override + public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, + byte[] endRowKey, int queryTimeout) + throws SQLException { List<HRegionLocation> regions = tableSplits.get(Bytes.toString(tableName)); if (regions != null) { return regions; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 3e3f9f1ab3..9945896dd7 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -82,20 +82,42 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple return getDelegate().getTableIfExists(tableName); } + /** + * {@inheritDoc}. + */ @Override public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { return getDelegate().getAllTableRegions(tableName); } + /** + * {@inheritDoc}. + */ + @Override + public List<HRegionLocation> getAllTableRegions(byte[] tableName, int queryTimeout) + throws SQLException { + return getDelegate().getAllTableRegions(tableName, queryTimeout); + } + /** * {@inheritDoc}. */ @Override public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, - byte[] endRowKey) throws SQLException { + byte[] endRowKey) throws SQLException { return getDelegate().getTableRegions(tableName, startRowKey, endRowKey); } + /** + * {@inheritDoc}. + */ + @Override + public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, + byte[] endRowKey, int queryTimeout) + throws SQLException { + return getDelegate().getTableRegions(tableName, startRowKey, endRowKey, queryTimeout); + } + @Override public void addTable(PTable table, long resolvedTime) throws SQLException { getDelegate().addTable(table, resolvedTime); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 67187ec09b..0279377d5a 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -484,7 +484,7 @@ public interface QueryServices extends SQLCloseable { */ String PHOENIX_GET_REGIONS_RETRIES = "phoenix.get.table.regions.retries"; - int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 3; + int DEFAULT_PHOENIX_GET_REGIONS_RETRIES = 10; /** * Get executor service used for parallel scans diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java index 511467e693..bed1ead96d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java @@ -68,7 +68,8 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper { if ((snapshotName = getSnapshotName(conf)) != null) { return getRegionLocationsFromSnapshot(conf, snapshotName); } else { - return context.getConnection().getQueryServices().getAllTableRegions(tableName); + return context.getConnection().getQueryServices().getAllTableRegions(tableName, + context.getStatement().getQueryTimeoutInMillis()); } } @@ -84,7 +85,8 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper { return getRegionLocationsFromSnapshot(conf, snapshotName); } else { return context.getConnection().getQueryServices() - .getTableRegions(tableName, startRegionBoundaryKey, stopRegionBoundaryKey); + .getTableRegions(tableName, startRegionBoundaryKey, stopRegionBoundaryKey, + context.getStatement().getQueryTimeoutInMillis()); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java index 21b25bf92d..0631ddf6e6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java @@ -46,6 +46,7 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLTimeoutException; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -210,7 +211,12 @@ public class MapReduceIT extends ParallelStatsDisabledIT { if (testVerySmallTimeOut) { // run job and it should fail due to Timeout - assertFalse("Job should fail with QueryTimeout.", job.waitForCompletion(true)); + try { + assertFalse("Job should fail with QueryTimeout.", job.waitForCompletion(true)); + } catch (RuntimeException e) { + assertTrue("Job execution failed with unexpected error.", + e.getCause() instanceof SQLTimeoutException); + } } else { //run assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true)); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java index 8dc558fe84..22361c7eb3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Test; @@ -105,7 +106,10 @@ public class SkipScanAfterManualSplitIT extends ParallelStatsDisabledIT { initTable(tableName); Connection conn = getConnection(); ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - int nRegions = services.getAllTableRegions(tableNameBytes).size(); + int queryTimeout = services.getProps() + .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); + int nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size(); int nInitialRegions = nRegions; Admin admin = services.getAdmin(); try { @@ -113,7 +117,7 @@ public class SkipScanAfterManualSplitIT extends ParallelStatsDisabledIT { int nTries = 0; while (nRegions == nInitialRegions && nTries < 10) { Thread.sleep(1000); - nRegions = services.getAllTableRegions(tableNameBytes).size(); + nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size(); nTries++; } // Split finished by this time, but cache isn't updated until @@ -124,7 +128,7 @@ public class SkipScanAfterManualSplitIT extends ParallelStatsDisabledIT { String query = "SELECT count(*) FROM " + tableName + " WHERE a IN ('tl','jt',' a',' b',' c',' d')"; ResultSet rs1 = conn.createStatement().executeQuery(query); assertTrue(rs1.next()); - nRegions = services.getAllTableRegions(tableNameBytes).size(); + nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size(); // Region cache has been updated, as there are more regions now assertNotEquals(nRegions, nInitialRegions); /* diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java index 8157135e40..cdc1ca6900 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; @@ -253,8 +252,28 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT { ((SQLTimeoutException)t).getErrorCode()); } finally { BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false); + EnvironmentEdgeManager.reset(); } + } + @Test + public void testQueryTimeoutWithMetadataLookup() throws Exception { + PreparedStatement ps = loadDataAndPreparePagedQuery(0, 0); + try { + ResultSet rs = ps.executeQuery(); + rs.next(); + fail("Query timeout is 0ms"); + } catch (SQLException e) { + Throwable t = e; + while (t != null && !(t instanceof SQLTimeoutException)) { + t = t.getCause(); + } + if (t == null) { + fail("Expected query to fail with SQLTimeoutException"); + } + assertEquals(OPERATION_TIMED_OUT.getErrorCode(), + ((SQLTimeoutException)t).getErrorCode()); + } } @Test diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java index caeb45badd..68c86186e3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java @@ -49,6 +49,7 @@ import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; @@ -78,7 +79,10 @@ public class RoundRobinResultIteratorIT extends ParallelStatsDisabledIT { int numRows = setupTableForSplit(tableName); Connection conn = getConnection(); ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - int nRegions = services.getAllTableRegions(tableNameBytes).size(); + int queryTimeout = services.getProps() + .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); + int nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size(); int nRegionsBeforeSplit = nRegions; Admin admin = services.getAdmin(); try { @@ -90,7 +94,7 @@ public class RoundRobinResultIteratorIT extends ParallelStatsDisabledIT { long waitTimeMillis = 2000; while (nRegions == nRegionsBeforeSplit && nTries < 10) { latch.await(waitTimeMillis, TimeUnit.MILLISECONDS); - nRegions = services.getAllTableRegions(tableNameBytes).size(); + nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size(); nTries++; } @@ -102,7 +106,7 @@ public class RoundRobinResultIteratorIT extends ParallelStatsDisabledIT { while (rs.next()) { numRowsRead++; } - nRegions = services.getAllTableRegions(tableNameBytes).size(); + nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size(); // Region cache has been updated, as there are more regions now assertNotEquals(nRegions, nRegionsBeforeSplit); assertEquals(numRows, numRowsRead); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java index 4f15622d1d..ab00c2ace4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java @@ -550,7 +550,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest { /** * After PHOENIX-6767 point lookup queries don't require to get table regions using - * {@link ConnectionQueryServices#getAllTableRegions(byte[])} to prepare scans + * {@link ConnectionQueryServices#getAllTableRegions(byte[], int)} to prepare scans * so custom driver defined here inject failures or delays don't have effect. * Hence skipping the test. */ @@ -574,7 +574,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest { /** * After PHOENIX-6767 point lookup queries don't require to get table regions using - * {@link ConnectionQueryServices#getAllTableRegions(byte[])} to prepare scans + * {@link ConnectionQueryServices#getAllTableRegions(byte[], int)} to prepare scans * so custom driver {@link PhoenixMetricsTestingDriver} defined here inject failures or delays * don't have effect. Hence skipping the test. */ @@ -1089,7 +1089,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest { /** * After PHOENIX-6767 point lookup queries don't require to get table regions using - * {@link ConnectionQueryServices#getAllTableRegions(byte[])} to prepare scans + * {@link ConnectionQueryServices#getAllTableRegions(byte[], int)} to prepare scans * so custom driver defined here inject failures or delays don't have effect. * Hence skipping the test. */ @@ -1133,7 +1133,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest { /** * After PHOENIX-6767 point lookup queries don't require to get table regions using - * {@link ConnectionQueryServices#getAllTableRegions(byte[])} to prepare scans + * {@link ConnectionQueryServices#getAllTableRegions(byte[], int)} to prepare scans * so custom driver defined here inject failures or delays don't have effect. * Hence skipping the test. */ @@ -1549,8 +1549,8 @@ public class PhoenixTableLevelMetricsIT extends BaseTest { super(services, connectionInfo, info); } - // Make plan.iterator() fail (ultimately calls CQSI.getAllTableRegions()) - @Override public List<HRegionLocation> getAllTableRegions(byte[] tableName) + @Override + public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { if (failExecuteQueryAndClientSideDeletes) { throw new SQLExceptionInfo.Builder(GET_TABLE_REGIONS_FAIL).build().buildException(); @@ -1563,12 +1563,27 @@ public class PhoenixTableLevelMetricsIT extends BaseTest { return super.getAllTableRegions(tableName); } + // Make plan.iterator() fail (ultimately calls CQSI.getAllTableRegions()) + @Override public List<HRegionLocation> getAllTableRegions(byte[] tableName, + int queryTimeout) + throws SQLException { + if (failExecuteQueryAndClientSideDeletes) { + throw new SQLExceptionInfo.Builder(GET_TABLE_REGIONS_FAIL).build().buildException(); + } + try { + Thread.sleep(injectDelay); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return super.getAllTableRegions(tableName, queryTimeout); + } + @Override public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, - byte[] endRowKey) throws SQLException { + byte[] endRowKey) throws SQLException { if (failExecuteQueryAndClientSideDeletes) { throw new SQLExceptionInfo.Builder(GET_TABLE_REGIONS_FAIL) - .build().buildException(); + .build().buildException(); } try { Thread.sleep(injectDelay); @@ -1577,6 +1592,21 @@ public class PhoenixTableLevelMetricsIT extends BaseTest { } return super.getTableRegions(tableName, startRowKey, endRowKey); } + + @Override + public List<HRegionLocation> getTableRegions(byte[] tableName, byte[] startRowKey, + byte[] endRowKey, int queryTimeout) throws SQLException { + if (failExecuteQueryAndClientSideDeletes) { + throw new SQLExceptionInfo.Builder(GET_TABLE_REGIONS_FAIL) + .build().buildException(); + } + try { + Thread.sleep(injectDelay); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return super.getTableRegions(tableName, startRowKey, endRowKey, queryTimeout); + } } /** diff --git a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java index ca07528979..7e64ef69d5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java @@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.BaseTest; @@ -76,7 +75,6 @@ import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; @@ -84,7 +82,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; @@ -581,6 +578,9 @@ public abstract class BaseStatsCollectorIT extends BaseTest { + "(k VARCHAR PRIMARY KEY, a.v INTEGER, b.v INTEGER, c.v INTEGER NULL, d.v INTEGER NULL) " + tableDDLOptions ); stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?, ?, ?, ?)"); + int queryTimeout = conn.unwrap(PhoenixConnection.class).getQueryServices().getProps() + .getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, + QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); byte[] val = new byte[250]; for (int i = 0; i < nRows; i++) { stmt.setString(1, Character.toString((char)('a' + i)) + Bytes.toString(val)); @@ -623,7 +623,8 @@ public abstract class BaseStatsCollectorIT extends BaseTest { assertEquals(physicalTableName, planAttributes.getTableName()); ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - List<HRegionLocation> regions = services.getAllTableRegions(Bytes.toBytes(physicalTableName)); + List<HRegionLocation> regions = + services.getAllTableRegions(Bytes.toBytes(physicalTableName), queryTimeout); assertEquals(1, regions.size()); collectStatistics(conn, fullTableName, Long.toString(1000)); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java index 496fb1cc42..875819540b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerCacheClientTest.java @@ -18,11 +18,14 @@ package org.apache.phoenix.cache; import static org.junit.Assert.assertEquals; import java.sql.SQLException; +import java.util.HashMap; + import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.util.ReadOnlyProps; import org.junit.Test; import org.mockito.Mockito; @@ -32,11 +35,13 @@ public class ServerCacheClientTest { PhoenixConnection connection = Mockito.mock(PhoenixConnection.class); ConnectionQueryServices services = Mockito.mock(ConnectionQueryServices.class); Mockito.when(services.getExecutor()).thenReturn(null); + Mockito.when(services.getProps()).thenReturn(new ReadOnlyProps(new HashMap<>())); Mockito.when(connection.getQueryServices()).thenReturn(services); byte[] tableName = Bytes.toBytes("TableName"); PTableImpl pTable = Mockito.mock(PTableImpl.class); Mockito.when(pTable.getPhysicalName()).thenReturn(PNameFactory.newName("TableName")); - Mockito.when(services.getAllTableRegions(tableName)).thenThrow(new SQLException("Test Exception")); + Mockito.when(services.getAllTableRegions(tableName, 600000)).thenThrow(new SQLException( + "Test Exception")); ServerCacheClient client = new ServerCacheClient(connection); try { client.addServerCache(null, null, null, null, pTable, false); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 17391345d4..9d3f962521 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -78,7 +78,6 @@ import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.expression.aggregator.CountAggregator; import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.expression.function.TimeUnit; -import org.apache.phoenix.filter.EmptyColumnOnlyFilter; import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; @@ -2563,7 +2562,8 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { ScanRanges ranges=plan.getContext().getScanRanges(); List<HRegionLocation> regionLocations= - conn.getQueryServices().getAllTableRegions(Bytes.toBytes("SALT_TEST2900")); + conn.getQueryServices().getAllTableRegions(Bytes.toBytes("SALT_TEST2900"), + 60000); for (HRegionLocation regionLocation : regionLocations) { assertTrue(ranges.intersectRegion(regionLocation.getRegion().getStartKey(), regionLocation.getRegion().getEndKey(), false)); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java index 9052e79497..d0d19c23eb 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java @@ -61,6 +61,7 @@ import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mock; import org.mockito.Mockito; @@ -252,19 +253,11 @@ public class ConnectionQueryServicesImplTest { private void testGetNextRegionStartKey(ConnectionQueryServicesImpl mockCqsi, HRegionLocation mockRegionLocation, byte[] key, boolean isCorrupted, HRegionLocation mockPrevRegionLocation) { - try { - mockCqsi.getNextRegionStartKey(mockRegionLocation, key, mockPrevRegionLocation); - if (isCorrupted) { - fail(); - } - } catch (IOException e) { - if (!isCorrupted) { - fail(); - } - } + mockCqsi.getNextRegionStartKey(mockRegionLocation, key, mockPrevRegionLocation); assertEquals(isCorrupted ? 1 : 0, - GlobalClientMetrics.GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY.getMetric().getValue()); + GlobalClientMetrics.GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY.getMetric() + .getValue()); } @Test diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index 4454ea5761..8433906f37 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -112,7 +112,10 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME)); TableRef tableRef = new TableRef(table); - List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes()); + List<HRegionLocation> regions = + pconn.getQueryServices() + .getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes(), + 60000); List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges); assertEquals("Unexpected number of splits: " + ranges.size(), expectedSplits.size(), ranges.size()); for (int i=0; i<expectedSplits.size(); i++) {