PHOENIX-3535 Fix race condition in ConnectionQueryServicesImpl#acquireUpgradeMutex
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/18cb85e6 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/18cb85e6 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/18cb85e6 Branch: refs/heads/encodecolumns2 Commit: 18cb85e69580ede3bdb391c2134c6f12effefabe Parents: 0ea97ce Author: Samarth <samarth.j...@salesforce.com> Authored: Wed Dec 21 15:10:09 2016 -0800 Committer: Samarth <samarth.j...@salesforce.com> Committed: Wed Dec 21 15:10:09 2016 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/UpgradeIT.java | 43 ++++++++---------- .../query/ConnectionQueryServicesImpl.java | 46 +++++++++++--------- 2 files changed, 44 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/18cb85e6/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java index 733dab0..49fdba6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java @@ -19,6 +19,9 @@ package org.apache.phoenix.end2end; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX; +import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_LOCKED; +import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_UNLOCKED; import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT; import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT; import static org.apache.phoenix.util.UpgradeUtil.SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW; @@ -694,17 +697,30 @@ public class UpgradeIT extends ParallelStatsDisabledIT { } } + private void putUnlockKVInSysMutex(byte[] row) throws Exception { + try (Connection conn = getConnection(false, null)) { + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + try (HTableInterface sysMutexTable = services.getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) { + byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; + byte[] qualifier = UPGRADE_MUTEX; + Put put = new Put(row); + put.add(family, qualifier, UPGRADE_MUTEX_UNLOCKED); + sysMutexTable.put(put); + sysMutexTable.flushCommits(); + } + } + } + @Test public void testAcquiringAndReleasingUpgradeMutex() throws Exception { ConnectionQueryServices services = null; byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, generateUniqueName()); - boolean dropSysMutexTable = false; try (Connection conn = getConnection(false, null)) { services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + putUnlockKVInSysMutex(mutexRowKey); assertTrue(((ConnectionQueryServicesImpl)services) .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey)); - dropSysMutexTable = true; try { ((ConnectionQueryServicesImpl)services) .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey); @@ -714,16 +730,6 @@ public class UpgradeIT extends ParallelStatsDisabledIT { } assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey)); assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey)); - } finally { - // We need to drop the SYSTEM.MUTEX table else other tests calling acquireUpgradeMutex will unexpectedly fail because they - // won't see the UNLOCKED cell present for their key. This cell is inserted into the table the first time we create the - // SYSTEM.MUTEX table. - if (services != null && dropSysMutexTable) { - try (HBaseAdmin admin = services.getAdmin()) { - admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES); - admin.deleteTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES); - } - } } } @@ -735,9 +741,9 @@ public class UpgradeIT extends ParallelStatsDisabledIT { final AtomicInteger numExceptions = new AtomicInteger(0); ConnectionQueryServices services = null; final byte[] mutexKey = Bytes.toBytes(generateUniqueName()); - boolean dropSysMutexTable = false; try (Connection conn = getConnection(false, null)) { services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + putUnlockKVInSysMutex(mutexKey); FutureTask<Void> task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey)); FutureTask<Void> task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey)); Thread t1 = new Thread(task1); @@ -751,20 +757,9 @@ public class UpgradeIT extends ParallelStatsDisabledIT { task1.get(); task2.get(); assertTrue("One of the threads should have acquired the mutex", mutexStatus1.get() || mutexStatus2.get()); - dropSysMutexTable = true; assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(), mutexStatus2.get()); assertEquals("One and only one thread should have caught UpgradeRequiredException ", 1, numExceptions.get()); - } finally { - // We need to drop the SYSTEM.MUTEX table else other tests calling acquireUpgradeMutex will unexpectedly fail because they - // won't see the UNLOCKED cell present for their key. This cell is inserted into the table the first time we create the - // SYSTEM.MUTEX table. - if (services != null && dropSysMutexTable) { - try (HBaseAdmin admin = services.getAdmin()) { - admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES); - admin.deleteTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES); - } - } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18cb85e6/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index da43af7..9d7a3d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -276,9 +276,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final boolean renewLeaseEnabled; private final boolean isAutoUpgradeEnabled; private final AtomicBoolean upgradeRequired = new AtomicBoolean(false); - private static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); - private static final byte[] UPGRADE_MUTEX_LOCKED = "UPGRADE_MUTEX_LOCKED".getBytes(); - private static final byte[] UPGRADE_MUTEX_UNLOCKED = "UPGRADE_MUTEX_UNLOCKED".getBytes(); + public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); + public static final byte[] UPGRADE_MUTEX_LOCKED = "UPGRADE_MUTEX_LOCKED".getBytes(); + public static final byte[] UPGRADE_MUTEX_UNLOCKED = "UPGRADE_MUTEX_UNLOCKED".getBytes(); private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); @@ -2352,6 +2352,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement hConnectionEstablished = true; boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props); try (HBaseAdmin admin = getAdmin()) { + createSysMutexTable(admin); boolean mappedSystemCatalogExists = admin .tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true)); if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, @@ -2438,6 +2439,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + private void createSysMutexTable(HBaseAdmin admin) throws IOException, SQLException { + try { + HTableDescriptor tableDesc = new HTableDescriptor( + TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)); + HColumnDescriptor columnDesc = new HColumnDescriptor( + PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES); + columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after some time + tableDesc.addFamily(columnDesc); + admin.createTable(tableDesc); + try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) { + byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); + Put put = new Put(mutexRowKey); + put.add(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, UPGRADE_MUTEX, UPGRADE_MUTEX_UNLOCKED); + sysMutexTable.put(put); + } + } catch (TableExistsException e) { + // Ignore + } + } + private void createOtherSystemTables(PhoenixConnection metaConnection) throws SQLException { try { metaConnection.createStatement().execute(QueryConstants.CREATE_SEQUENCE_METADATA); @@ -2953,24 +2975,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] rowToLock) throws IOException, SQLException { Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP); - try (HBaseAdmin admin = getAdmin()) { - try { - HTableDescriptor tableDesc = new HTableDescriptor( - TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)); - HColumnDescriptor columnDesc = new HColumnDescriptor( - PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES); - columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after some time - tableDesc.addFamily(columnDesc); - admin.createTable(tableDesc); - try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) { - Put put = new Put(rowToLock); - put.add(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, UPGRADE_MUTEX, UPGRADE_MUTEX_UNLOCKED); - sysMutexTable.put(put); - } - } catch (TableExistsException e) { - // Ignore - } - } try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) { byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; byte[] qualifier = UPGRADE_MUTEX;