PHOENIX-3253 Make changes to tests to support method level parallelization

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d4f72018
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d4f72018
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d4f72018

Branch: refs/heads/calcite
Commit: d4f7201854a8d27ba7e6dd1037a10a4a34842b92
Parents: 8174fc4
Author: James Taylor <jamestay...@apache.org>
Authored: Wed Sep 21 12:45:25 2016 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Wed Sep 21 12:45:25 2016 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/StatsCollectorIT.java       | 33 ++++------
 .../apache/phoenix/end2end/StoreNullsIT.java    | 68 ++------------------
 .../query/ConnectionQueryServicesImpl.java      |  1 +
 .../java/org/apache/phoenix/util/TestUtil.java  | 55 ++++++++++++++++
 4 files changed, 74 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4f72018/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index dd7741a..9a1ea26 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -40,8 +40,8 @@ import java.util.Random;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -77,8 +77,8 @@ public class StatsCollectorIT extends ParallelStatsEnabledIT {
         props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(20));
         props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, 
Boolean.TRUE.toString());
         props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, 
Boolean.TRUE.toString());
-        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1024));
         props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
+        props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
@@ -347,16 +347,8 @@ public class StatsCollectorIT extends 
ParallelStatsEnabledIT {
         return stmt;
     }
 
-    private void compactTable(Connection conn, String tableName) throws 
IOException, InterruptedException, SQLException {
-        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
-        HBaseAdmin admin = services.getAdmin();
-        try {
-            admin.flush(tableName);
-            admin.majorCompact(tableName);
-            Thread.sleep(10000); // FIXME: how do we know when compaction is 
done?
-        } finally {
-            admin.close();
-        }
+    private void compactTable(Connection conn, String tableName) throws 
Exception {
+        TestUtil.doMajorCompaction(conn, tableName);
     }
     
     @Test
@@ -374,9 +366,6 @@ public class StatsCollectorIT extends 
ParallelStatsEnabledIT {
         Connection conn;
         PreparedStatement stmt;
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        if (minStatsUpdateFreq != null) {
-            props.setProperty(QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB, 
minStatsUpdateFreq.toString());
-        }
         conn = DriverManager.getConnection(getUrl(), props);
         conn.createStatement().execute("CREATE TABLE " + tableName + "(k 
CHAR(1) PRIMARY KEY, v INTEGER, w INTEGER) "
                 + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + Boolean.FALSE);
@@ -391,11 +380,11 @@ public class StatsCollectorIT extends 
ParallelStatsEnabledIT {
         
         compactTable(conn, tableName);
         if (minStatsUpdateFreq == null) {
-            
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
-        }
-        // Confirm that when we have a non zero 
MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run
-        // UPDATATE STATISTICS, the new statistics are faulted in as expected.
-        if (minStatsUpdateFreq != null) {
+            ImmutableBytesPtr ptr = new 
ImmutableBytesPtr(Bytes.toBytes(tableName));
+            
conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(ptr);
+        } else {
+            // Confirm that when we have a non zero 
MIN_STATS_UPDATE_FREQ_MS_ATTRIB, after we run
+            // UPDATATE STATISTICS, the new statistics are faulted in as 
expected.
             List<KeyRange>keyRanges = getAllSplits(conn, tableName);
             assertNotEquals(nRows+1, keyRanges.size());
             // If we've set MIN_STATS_UPDATE_FREQ_MS_ATTRIB, an UPDATE 
STATISTICS will invalidate the cache
@@ -412,7 +401,8 @@ public class StatsCollectorIT extends 
ParallelStatsEnabledIT {
         
         compactTable(conn, tableName);
         if (minStatsUpdateFreq == null) {
-            
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            ImmutableBytesPtr ptr = new 
ImmutableBytesPtr(Bytes.toBytes(tableName));
+            
conn.unwrap(PhoenixConnection.class).getQueryServices().invalidateStats(ptr);
         }
         
         keyRanges = getAllSplits(conn, tableName);
@@ -429,7 +419,6 @@ public class StatsCollectorIT extends 
ParallelStatsEnabledIT {
                 + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " WHERE 
PHYSICAL_NAME='" + tableName + "'");
         rs.next();
         assertEquals(nRows - nDeletedRows, rs.getLong(1));
-        
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4f72018/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
index 904743a..c14cf39 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java
@@ -28,16 +28,11 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -46,12 +41,11 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-
 /**
  * Tests to demonstrate and verify the STORE_NULLS option on a table,
  * which allows explicitly storing null values (as opposed to using HBase 
Deletes) for nulls. This
@@ -132,7 +126,7 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
     }
 
     @Test
-    public void testQueryingHistory() throws SQLException, 
InterruptedException, IOException {
+    public void testQueryingHistory() throws Exception {
         stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')");
         stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 
'v1')");
 
@@ -144,8 +138,8 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
         stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 
null)");
         Thread.sleep(10L);
 
-        doMajorCompaction(WITH_NULLS);
-        doMajorCompaction(WITHOUT_NULLS);
+        TestUtil.doMajorCompaction(conn, WITH_NULLS);
+        TestUtil.doMajorCompaction(conn, WITHOUT_NULLS);
 
         Properties historicalProps = new Properties();
         historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
@@ -171,7 +165,7 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
 
     // Row deletes should work in the same way regardless of what STORE_NULLS 
is set to
     @Test
-    public void testDeletes() throws SQLException, InterruptedException, 
IOException {
+    public void testDeletes() throws Exception {
         stmt.executeUpdate("UPSERT INTO " + WITH_NULLS + " VALUES (1, 'v1')");
         stmt.executeUpdate("UPSERT INTO " + WITHOUT_NULLS + " VALUES (1, 
'v1')");
 
@@ -183,8 +177,8 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
         stmt.executeUpdate("DELETE FROM " + WITHOUT_NULLS + " WHERE id = 1");
         Thread.sleep(10L);
 
-        doMajorCompaction(WITH_NULLS);
-        doMajorCompaction(WITHOUT_NULLS);
+        TestUtil.doMajorCompaction(conn, WITH_NULLS);
+        TestUtil.doMajorCompaction(conn, WITHOUT_NULLS);
 
         Properties historicalProps = new Properties();
         historicalProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
@@ -221,53 +215,5 @@ public class StoreNullsIT extends ParallelStatsDisabledIT {
         assertTrue(rs.getBoolean(1));
     }
 
-    /**
-     * Runs a major compaction, and then waits until the compaction is 
complete before returning.
-     *
-     * @param tableName name of the table to be compacted
-     */
-    private void doMajorCompaction(String tableName) throws IOException, 
InterruptedException {
-
-        tableName = SchemaUtil.normalizeIdentifier(tableName);
-
-        // We simply write a marker row, request a major compaction, and then 
wait until the marker
-        // row is gone
-        HTable htable = new HTable(getUtility().getConfiguration(), tableName);
-        byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
-
-
-        Put put = new Put(markerRowKey);
-        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
HConstants.EMPTY_BYTE_ARRAY,
-                HConstants.EMPTY_BYTE_ARRAY);
-        htable.put(put);
-        htable.delete(new Delete(markerRowKey));
-        htable.close();
-
-        HBaseAdmin hbaseAdmin = new 
HBaseAdmin(getUtility().getConfiguration());
-        hbaseAdmin.flush(tableName);
-        hbaseAdmin.majorCompact(tableName);
-        hbaseAdmin.close();
-
-        boolean compactionDone = false;
-        while (!compactionDone) {
-            Thread.sleep(2000L);
-            htable = new HTable(getUtility().getConfiguration(), tableName);
-            Scan scan = new Scan();
-            scan.setStartRow(markerRowKey);
-            scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
-            scan.setRaw(true);
-
-            ResultScanner scanner = htable.getScanner(scan);
-            List<Result> results = Lists.newArrayList(scanner);
-            LOG.info("Results: " + results);
-            compactionDone = results.isEmpty();
-            scanner.close();
-
-            LOG.info("Compaction done: " + compactionDone);
-        }
-
-        htable.close();
-    }
-
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4f72018/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dfe7ee8..dc07220 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3073,6 +3073,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             HTableInterface htable = this.getTable(SchemaUtil
                     
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
this.getProps()).getName());
             try {
+                tableStatsCache.invalidateAll();
                 final Map<byte[], Long> results =
                         htable.coprocessorService(MetaDataService.class, 
HConstants.EMPTY_START_ROW,
                                 HConstants.EMPTY_END_ROW, new 
Batch.Call<MetaDataService, Long>() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d4f72018/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 5500e7a..50180d1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -49,9 +49,16 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -90,6 +97,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.LikeParseNode.LikeType;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -110,6 +118,8 @@ import com.google.common.collect.Lists;
 
 
 public class TestUtil {
+    private static final Log LOG = LogFactory.getLog(TestUtil.class);
+    
     public static final String DEFAULT_SCHEMA_NAME = "";
     public static final String DEFAULT_DATA_TABLE_NAME = "T";
     public static final String DEFAULT_INDEX_TABLE_NAME = "I";
@@ -713,5 +723,50 @@ public class TestUtil {
                 + (options!=null? options : "");
             conn.createStatement().execute(ddl);
     }
+
+    /**
+     * Runs a major compaction, and then waits until the compaction is 
complete before returning.
+     *
+     * @param tableName name of the table to be compacted
+     */
+    public static void doMajorCompaction(Connection conn, String tableName) 
throws Exception {
+    
+        tableName = SchemaUtil.normalizeIdentifier(tableName);
+    
+        // We simply write a marker row, request a major compaction, and then 
wait until the marker
+        // row is gone
+        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+        try (HTableInterface htable = 
services.getTable(Bytes.toBytes(tableName))) {
+            byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
+        
+            Put put = new Put(markerRowKey);
+            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
HConstants.EMPTY_BYTE_ARRAY,
+                    HConstants.EMPTY_BYTE_ARRAY);
+            htable.put(put);
+            htable.delete(new Delete(markerRowKey));
+        
+            HBaseAdmin hbaseAdmin = services.getAdmin();
+            hbaseAdmin.flush(tableName);
+            hbaseAdmin.majorCompact(tableName);
+            hbaseAdmin.close();
+        
+            boolean compactionDone = false;
+            while (!compactionDone) {
+                Thread.sleep(2000L);
+                Scan scan = new Scan();
+                scan.setStartRow(markerRowKey);
+                scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
+                scan.setRaw(true);
+        
+                ResultScanner scanner = htable.getScanner(scan);
+                List<Result> results = Lists.newArrayList(scanner);
+                LOG.info("Results: " + results);
+                compactionDone = results.isEmpty();
+                scanner.close();
+        
+                LOG.info("Compaction done: " + compactionDone);
+            }
+        }
+    }
 }
 

Reply via email to