Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 0ea97ce67 -> 18cb85e69


PHOENIX-3535 Fix race condition in 
ConnectionQueryServicesImpl#acquireUpgradeMutex


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/18cb85e6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/18cb85e6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/18cb85e6

Branch: refs/heads/4.x-HBase-0.98
Commit: 18cb85e69580ede3bdb391c2134c6f12effefabe
Parents: 0ea97ce
Author: Samarth <samarth.j...@salesforce.com>
Authored: Wed Dec 21 15:10:09 2016 -0800
Committer: Samarth <samarth.j...@salesforce.com>
Committed: Wed Dec 21 15:10:09 2016 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/UpgradeIT.java   | 43 ++++++++----------
 .../query/ConnectionQueryServicesImpl.java      | 46 +++++++++++---------
 2 files changed, 44 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/18cb85e6/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index 733dab0..49fdba6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -19,6 +19,9 @@ package org.apache.phoenix.end2end;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX;
+import static 
org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_LOCKED;
+import static 
org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_UNLOCKED;
 import static 
org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 import static 
org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
 import static 
org.apache.phoenix.util.UpgradeUtil.SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
@@ -694,17 +697,30 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
         }
     }
     
+    private void putUnlockKVInSysMutex(byte[] row) throws Exception {
+        try (Connection conn = getConnection(false, null)) {
+            ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            try (HTableInterface sysMutexTable = 
services.getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+                byte[] family = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
+                byte[] qualifier = UPGRADE_MUTEX;
+                Put put = new Put(row);
+                put.add(family, qualifier, UPGRADE_MUTEX_UNLOCKED);
+                sysMutexTable.put(put);
+                sysMutexTable.flushCommits();
+            }
+        }
+    }
+    
     @Test
     public void testAcquiringAndReleasingUpgradeMutex() throws Exception {
         ConnectionQueryServices services = null;
         byte[] mutexRowKey = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
                 generateUniqueName());
-        boolean dropSysMutexTable = false;
         try (Connection conn = getConnection(false, null)) {
             services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            putUnlockKVInSysMutex(mutexRowKey);
             assertTrue(((ConnectionQueryServicesImpl)services)
                     
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, 
mutexRowKey));
-            dropSysMutexTable = true;
             try {
                 ((ConnectionQueryServicesImpl)services)
                         
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, 
mutexRowKey);
@@ -714,16 +730,6 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
             }
             
assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
             
assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
-        } finally {
-            // We need to drop the SYSTEM.MUTEX table else other tests calling 
acquireUpgradeMutex will unexpectedly fail because they
-            // won't see the UNLOCKED cell present for their key. This cell is 
inserted into the table the first time we create the 
-            // SYSTEM.MUTEX table.
-            if (services != null && dropSysMutexTable) {
-                try (HBaseAdmin admin = services.getAdmin()) {
-                    
admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES);
-                    
admin.deleteTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES);
-                }
-            }
         }
     }
     
@@ -735,9 +741,9 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
         final AtomicInteger numExceptions = new AtomicInteger(0);
         ConnectionQueryServices services = null;
         final byte[] mutexKey = Bytes.toBytes(generateUniqueName());
-        boolean dropSysMutexTable = false;
         try (Connection conn = getConnection(false, null)) {
             services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            putUnlockKVInSysMutex(mutexKey);
             FutureTask<Void> task1 = new FutureTask<>(new 
AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey));
             FutureTask<Void> task2 = new FutureTask<>(new 
AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey));
             Thread t1 = new Thread(task1);
@@ -751,20 +757,9 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
             task1.get();
             task2.get();
             assertTrue("One of the threads should have acquired the mutex", 
mutexStatus1.get() || mutexStatus2.get());
-            dropSysMutexTable = true;
             assertNotEquals("One and only one thread should have acquired the 
mutex ", mutexStatus1.get(),
                     mutexStatus2.get());
             assertEquals("One and only one thread should have caught 
UpgradeRequiredException ", 1, numExceptions.get());
-        } finally {
-            // We need to drop the SYSTEM.MUTEX table else other tests calling 
acquireUpgradeMutex will unexpectedly fail because they
-            // won't see the UNLOCKED cell present for their key. This cell is 
inserted into the table the first time we create the 
-            // SYSTEM.MUTEX table.
-            if (services != null && dropSysMutexTable) {
-                try (HBaseAdmin admin = services.getAdmin()) {
-                    
admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES);
-                    
admin.deleteTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES);
-                }
-            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/18cb85e6/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index da43af7..9d7a3d2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -276,9 +276,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private final boolean renewLeaseEnabled;
     private final boolean isAutoUpgradeEnabled;
     private final AtomicBoolean upgradeRequired = new AtomicBoolean(false);
-    private static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
-    private static final byte[] UPGRADE_MUTEX_LOCKED = 
"UPGRADE_MUTEX_LOCKED".getBytes();
-    private static final byte[] UPGRADE_MUTEX_UNLOCKED = 
"UPGRADE_MUTEX_UNLOCKED".getBytes();
+    public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
+    public static final byte[] UPGRADE_MUTEX_LOCKED = 
"UPGRADE_MUTEX_LOCKED".getBytes();
+    public static final byte[] UPGRADE_MUTEX_UNLOCKED = 
"UPGRADE_MUTEX_UNLOCKED".getBytes();
 
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
@@ -2352,6 +2352,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             hConnectionEstablished = true;
                             boolean isDoNotUpgradePropSet = 
UpgradeUtil.isNoUpgradeSet(props);
                             try (HBaseAdmin admin = getAdmin()) {
+                                createSysMutexTable(admin);
                                 boolean mappedSystemCatalogExists = admin
                                         
.tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true));
                                 if 
(SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
@@ -2438,6 +2439,27 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
     
+    private void createSysMutexTable(HBaseAdmin admin) throws IOException, 
SQLException {
+        try {
+            HTableDescriptor tableDesc = new HTableDescriptor(
+                    
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES));
+            HColumnDescriptor columnDesc = new HColumnDescriptor(
+                    PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES);
+            columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after 
some time
+            tableDesc.addFamily(columnDesc);
+            admin.createTable(tableDesc);
+            try (HTableInterface sysMutexTable = 
getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+                byte[] mutexRowKey = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+                Put put = new Put(mutexRowKey);
+                
put.add(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, UPGRADE_MUTEX, 
UPGRADE_MUTEX_UNLOCKED);
+                sysMutexTable.put(put);
+            }
+        } catch (TableExistsException e) {
+            // Ignore
+        }
+    }
+
     private void createOtherSystemTables(PhoenixConnection metaConnection) 
throws SQLException {
         try {
             
metaConnection.createStatement().execute(QueryConstants.CREATE_SEQUENCE_METADATA);
@@ -2953,24 +2975,6 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, 
byte[] rowToLock) throws IOException,
             SQLException {
         Preconditions.checkArgument(currentServerSideTableTimestamp < 
MIN_SYSTEM_TABLE_TIMESTAMP);
-        try (HBaseAdmin admin = getAdmin()) {
-            try {
-                HTableDescriptor tableDesc = new HTableDescriptor(
-                        
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES));
-                HColumnDescriptor columnDesc = new HColumnDescriptor(
-                        
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES);
-                columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire 
after some time
-                tableDesc.addFamily(columnDesc);
-                admin.createTable(tableDesc);
-                try (HTableInterface sysMutexTable = 
getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
-                    Put put = new Put(rowToLock);
-                    
put.add(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, UPGRADE_MUTEX, 
UPGRADE_MUTEX_UNLOCKED);
-                    sysMutexTable.put(put);
-                }
-            } catch (TableExistsException e) {
-                // Ignore
-            }
-        }
         try (HTableInterface sysMutexTable = 
getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
             byte[] family = 
PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
             byte[] qualifier = UPGRADE_MUTEX;

Reply via email to