PHOENIX-3253 Make changes to tests to support method level parallelization
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d4f72018 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d4f72018 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d4f72018 Branch: refs/heads/calcite Commit: d4f7201854a8d27ba7e6dd1037a10a4a34842b92 Parents: 8174fc4 Author: James Taylor <jamestay...@apache.org> Authored: Wed Sep 21 12:45:25 2016 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Wed Sep 21 12:45:25 2016 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/StatsCollectorIT.java | 33 ++++------ .../apache/phoenix/end2end/StoreNullsIT.java | 68 ++------------------ .../query/ConnectionQueryServicesImpl.java | 1 + .../java/org/apache/phoenix/util/TestUtil.java | 55 ++++++++++++++++ 4 files changed, 74 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4f72018/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java index dd7741a..9a1ea26 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java @@ -40,8 +40,8 @@ import java.util.Random; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.ConnectionQueryServices; @@ -77,8 +77,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT { props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString()); props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString()); - props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1024)); props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true)); + props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -347,16 +347,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT { return stmt; } - private void compactTable(Connection conn, String tableName) throws IOException, InterruptedException, SQLException { - ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - HBaseAdmin admin = services.getAdmin(); - try { - admin.flush(tableName); - admin.majorCompact(tableName); - Thread.sleep(10000); // FIXME: how do we know when compaction is done? - } finally { - admin.close(); - } + private void compactTable(Connection conn, String tableName) throws Exception { + TestUtil.doMajorCompaction(conn, tableName); } @Test @@ -374,9 +366,6 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT { Connection conn; PreparedStatement stmt; Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - if (minStatsUpdateFreq != null) { - props.setProperty(QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB, minStatsUpdateFreq.toString()); - } conn = DriverManager.getConnection(getUrl(), props); conn.createStatement().execute("CREATE TABLE " + tableName + "(k CHAR(1) PRIMARY KEY, v INTEGER, w INTEGER) " + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE); @@ -391,11 +380,11 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT { compactTable(conn, tableName); if (minStatsUpdateFreq == null) { - conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); - } - // Confirm that when we have a non zero MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run - // UPDATATE STATISTICS, the new statistics are faulted in as expected. - if (minStatsUpdateFreq != null) { + ImmutableBytesPtr ptr = new ImmutableBytesPtr(Bytes.toBytes(tableName)); + conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(ptr); + } else { + // Confirm that when we have a non zero MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run + // UPDATATE STATISTICS, the new statistics are faulted in as expected. List<KeyRange>keyRanges = getAllSplits(conn, tableName); assertNotEquals(nRows+1, keyRanges.size()); // If we've set MIN_STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE STATISTICS will invalidate the cache @@ -412,7 +401,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT { compactTable(conn, tableName); if (minStatsUpdateFreq == null) { - conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + ImmutableBytesPtr ptr = new ImmutableBytesPtr(Bytes.toBytes(tableName)); + conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(ptr); } keyRanges = getAllSplits(conn, tableName); @@ -429,7 +419,6 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT { + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " WHERE PHYSICAL_NAME='" + tableName + "'"); rs.next(); assertEquals(nRows - nDeletedRows, rs.getLong(1)); - } @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4f72018/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java index 904743a..c14cf39 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java @@ -28,16 +28,11 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.List; import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -46,12 +41,11 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.Lists; - /** * Tests to demonstrate and verify the STORE_NULLS option on a table, * which allows explicitly storing null values (as opposed to using HBase Deletes) for nulls. This @@ -132,7 +126,7 @@ public class StoreNullsIT extends ParallelStatsDisabledIT { } @Test - public void testQueryingHistory() throws SQLException, InterruptedException, IOException { + public void testQueryingHistory() throws Exception { stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')"); stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 'v1')"); @@ -144,8 +138,8 @@ public class StoreNullsIT extends ParallelStatsDisabledIT { stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, null)"); Thread.sleep(10L); - doMajorCompaction(WITH_NULLS); - doMajorCompaction(WITHOUT_NULLS); + TestUtil.doMajorCompaction(conn, WITH_NULLS); + TestUtil.doMajorCompaction(conn, WITHOUT_NULLS); Properties historicalProps = new Properties(); historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, @@ -171,7 +165,7 @@ public class StoreNullsIT extends ParallelStatsDisabledIT { // Row deletes should work in the same way regardless of what STORE_NULLS is set to @Test - public void testDeletes() throws SQLException, InterruptedException, IOException { + public void testDeletes() throws Exception { stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')"); stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 'v1')"); @@ -183,8 +177,8 @@ public class StoreNullsIT extends ParallelStatsDisabledIT { stmt.executeUpdate("DELETE FROM " + WITHOUT_NULLS + " WHERE id = 1"); Thread.sleep(10L); - doMajorCompaction(WITH_NULLS); - doMajorCompaction(WITHOUT_NULLS); + TestUtil.doMajorCompaction(conn, WITH_NULLS); + TestUtil.doMajorCompaction(conn, WITHOUT_NULLS); Properties historicalProps = new Properties(); historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, @@ -221,53 +215,5 @@ public class StoreNullsIT extends ParallelStatsDisabledIT { assertTrue(rs.getBoolean(1)); } - /** - * Runs a major compaction, and then waits until the compaction is complete before returning. - * - * @param tableName name of the table to be compacted - */ - private void doMajorCompaction(String tableName) throws IOException, InterruptedException { - - tableName = SchemaUtil.normalizeIdentifier(tableName); - - // We simply write a marker row, request a major compaction, and then wait until the marker - // row is gone - HTable htable = new HTable(getUtility().getConfiguration(), tableName); - byte[] markerRowKey = Bytes.toBytes("TO_DELETE"); - - - Put put = new Put(markerRowKey); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY); - htable.put(put); - htable.delete(new Delete(markerRowKey)); - htable.close(); - - HBaseAdmin hbaseAdmin = new HBaseAdmin(getUtility().getConfiguration()); - hbaseAdmin.flush(tableName); - hbaseAdmin.majorCompact(tableName); - hbaseAdmin.close(); - - boolean compactionDone = false; - while (!compactionDone) { - Thread.sleep(2000L); - htable = new HTable(getUtility().getConfiguration(), tableName); - Scan scan = new Scan(); - scan.setStartRow(markerRowKey); - scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 })); - scan.setRaw(true); - - ResultScanner scanner = htable.getScanner(scan); - List<Result> results = Lists.newArrayList(scanner); - LOG.info("Results: " + results); - compactionDone = results.isEmpty(); - scanner.close(); - - LOG.info("Compaction done: " + compactionDone); - } - - htable.close(); - } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4f72018/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index dfe7ee8..dc07220 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -3073,6 +3073,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HTableInterface htable = this.getTable(SchemaUtil .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName()); try { + tableStatsCache.invalidateAll(); final Map<byte[], Long> results = htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, Long>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4f72018/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 5500e7a..50180d1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -49,9 +49,16 @@ import java.util.Collections; import java.util.List; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -90,6 +97,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.LikeParseNode.LikeType; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -110,6 +118,8 @@ import com.google.common.collect.Lists; public class TestUtil { + private static final Log LOG = LogFactory.getLog(TestUtil.class); + public static final String DEFAULT_SCHEMA_NAME = ""; public static final String DEFAULT_DATA_TABLE_NAME = "T"; public static final String DEFAULT_INDEX_TABLE_NAME = "I"; @@ -713,5 +723,50 @@ public class TestUtil { + (options!=null? options : ""); conn.createStatement().execute(ddl); } + + /** + * Runs a major compaction, and then waits until the compaction is complete before returning. + * + * @param tableName name of the table to be compacted + */ + public static void doMajorCompaction(Connection conn, String tableName) throws Exception { + + tableName = SchemaUtil.normalizeIdentifier(tableName); + + // We simply write a marker row, request a major compaction, and then wait until the marker + // row is gone + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + try (HTableInterface htable = services.getTable(Bytes.toBytes(tableName))) { + byte[] markerRowKey = Bytes.toBytes("TO_DELETE"); + + Put put = new Put(markerRowKey); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY); + htable.put(put); + htable.delete(new Delete(markerRowKey)); + + HBaseAdmin hbaseAdmin = services.getAdmin(); + hbaseAdmin.flush(tableName); + hbaseAdmin.majorCompact(tableName); + hbaseAdmin.close(); + + boolean compactionDone = false; + while (!compactionDone) { + Thread.sleep(2000L); + Scan scan = new Scan(); + scan.setStartRow(markerRowKey); + scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = htable.getScanner(scan); + List<Result> results = Lists.newArrayList(scanner); + LOG.info("Results: " + results); + compactionDone = results.isEmpty(); + scanner.close(); + + LOG.info("Compaction done: " + compactionDone); + } + } + } }