PHOENIX-3375 Upgrade from v4.8.1 to 4.9.0 fails
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/030fb768 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/030fb768 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/030fb768 Branch: refs/heads/encodecolumns2 Commit: 030fb7684e5eebc11d95973abf8e22606b9baa31 Parents: 16e4a18 Author: Samarth <samarth.j...@salesforce.com> Authored: Fri Oct 28 12:55:05 2016 -0700 Committer: Samarth <samarth.j...@salesforce.com> Committed: Fri Oct 28 12:55:05 2016 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/UpgradeIT.java | 62 ++++++++--- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 5 + .../query/ConnectionQueryServicesImpl.java | 104 ++++++++++++++----- .../apache/phoenix/query/QueryConstants.java | 1 - 4 files changed, 132 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/030fb768/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java index d37419b..d377bd2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java @@ -37,7 +37,9 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -70,6 +72,8 @@ import org.junit.Test; public class UpgradeIT extends ParallelStatsDisabledIT { private String tenantId; + private static final byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); @Before public void generateTenantId() { @@ -693,27 +697,64 @@ public class UpgradeIT extends ParallelStatsDisabledIT { } @Test + public void testAcquiringAndReleasingUpgradeMutex() throws Exception { + ConnectionQueryServices services = null; + try (Connection conn = getConnection(false, null)) { + services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + assertTrue(((ConnectionQueryServicesImpl)services) + .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey)); + try { + ((ConnectionQueryServicesImpl)services) + .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey); + fail(); + } catch (UpgradeInProgressException expected) { + + } + assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey)); + assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey)); + } + } + + @Test public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Exception { final AtomicBoolean mutexStatus1 = new AtomicBoolean(false); final AtomicBoolean mutexStatus2 = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(2); final AtomicInteger numExceptions = new AtomicInteger(0); + ConnectionQueryServices services = null; try (Connection conn = getConnection(false, null)) { - final ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - Thread t1 = new Thread(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions)); + services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + FutureTask<Void> task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions)); + FutureTask<Void> task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions)); + Thread t1 = new Thread(task1); t1.setDaemon(true); - Thread t2 = new Thread(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions)); - t2.setDaemon(true);; + Thread t2 = new Thread(task2); + t2.setDaemon(true); t1.start(); t2.start(); latch.await(); + // make sure tasks didn't fail by calling get() + task1.get(); + task2.get(); assertTrue("One of the threads should have acquired the mutex", mutexStatus1.get() || mutexStatus2.get()); - assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(), mutexStatus2.get()); + assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(), + mutexStatus2.get()); assertEquals("One and only one thread should have caught UpgradeRequiredException ", 1, numExceptions.get()); + } finally { + if (services != null) { + releaseUpgradeMutex(services); + } } } - private static class AcquireMutexRunnable implements Runnable { + private void releaseUpgradeMutex(ConnectionQueryServices services) { + byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); + ((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey); + + } + + private static class AcquireMutexRunnable implements Callable<Void> { private final AtomicBoolean acquireStatus; private final ConnectionQueryServices services; @@ -726,20 +767,17 @@ public class UpgradeIT extends ParallelStatsDisabledIT { this.numExceptions = numExceptions; } @Override - public void run() { + public Void call() throws Exception { try { ((ConnectionQueryServicesImpl)services).acquireUpgradeMutex( - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey); acquireStatus.set(true); } catch (UpgradeInProgressException e) { numExceptions.incrementAndGet(); - } - catch (IOException | SQLException ignore) { - } finally { latch.countDown(); } + return null; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/030fb768/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 97fe010..9c5d521 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -301,6 +301,11 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final String ASYNC_CREATED_DATE = "ASYNC_CREATED_DATE"; public static final String SEQUENCE_TABLE_TYPE = SYSTEM_SEQUENCE_TABLE; + public static final String SYSTEM_MUTEX_TABLE_NAME = "MUTEX"; + public static final String SYSTEM_MUTEX_NAME = SchemaUtil.getTableName(QueryConstants.SYSTEM_SCHEMA_NAME, SYSTEM_MUTEX_TABLE_NAME); + public static final byte[] SYSTEM_MUTEX_NAME_BYTES = Bytes.toBytes(SYSTEM_MUTEX_NAME); + public static final byte[] SYSTEM_MUTEX_FAMILY_NAME_BYTES = TABLE_FAMILY_BYTES; + private final PhoenixConnection connection; private final ResultSet emptyResultSet; public static final int MAX_LOCAL_SI_VERSION_DISALLOW = VersionUtil.encodeVersion("0", "98", "8"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/030fb768/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index f19cb34..1773175 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -20,11 +20,12 @@ package org.apache.phoenix.query; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hbase.HColumnDescriptor.TTL; import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP; -import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; @@ -37,8 +38,10 @@ import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0; import java.io.IOException; import java.lang.ref.WeakReference; +import java.sql.PreparedStatement; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -78,6 +81,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; @@ -85,8 +89,10 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; @@ -229,6 +235,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class); private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100; private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000; + private static final int TTL_FOR_MUTEX = 15 * 60; // 15min protected final Configuration config; private final ConnectionInfo connectionInfo; // Copy of config.getProps(), but read-only to prevent synchronization that we @@ -274,6 +281,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final boolean renewLeaseEnabled; private final boolean isAutoUpgradeEnabled; private final AtomicBoolean upgradeRequired = new AtomicBoolean(false); + private static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); @@ -294,7 +302,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION; } }); - + private PMetaData newEmptyMetaData() { return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps())); } @@ -2272,12 +2280,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props); SQLException sqlE = null; try { - metaConnection.createStatement().executeUpdate("UPSERT INTO " + SYSTEM_STATS_NAME + " (" + - PhoenixDatabaseMetaData.TENANT_ID + "," + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + - PhoenixDatabaseMetaData.TABLE_NAME + "," + PhoenixDatabaseMetaData.COLUMN_NAME + "," + - PhoenixDatabaseMetaData.COLUMN_FAMILY + "," + PhoenixDatabaseMetaData.NULLABLE + ") VALUES (" + - "null," + schemaName + "," + tableName + "," + columnName + "," + QueryConstants.DEFAULT_COLUMN_FAMILY + "," + - ResultSetMetaData.columnNullable + ")"); + String dml = "UPSERT INTO " + SYSTEM_CATALOG_NAME + " (" + PhoenixDatabaseMetaData.TENANT_ID + "," + + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME + "," + + PhoenixDatabaseMetaData.COLUMN_NAME + "," + PhoenixDatabaseMetaData.COLUMN_FAMILY + "," + + PhoenixDatabaseMetaData.NULLABLE + ") VALUES (" + "?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = metaConnection.prepareStatement(dml); + stmt.setNull(1, Types.VARCHAR); + stmt.setString(2, schemaName); + stmt.setString(3, tableName); + stmt.setString(4, columnName); + stmt.setString(5, QueryConstants.DEFAULT_COLUMN_FAMILY); + stmt.setInt(6, ResultSetMetaData.columnNullable); + stmt.executeUpdate(); metaConnection.commit(); } catch (NewerTableAlreadyExistsException e) { logger.warn("Table already modified at this timestamp, so assuming column already nullable: " + columnName); @@ -2439,7 +2453,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement initializationException = ex; } } finally { - initialized = true; + try { + if (initializationException != null) { throw initializationException; } + } finally { + initialized = true; + } } } } @@ -2475,6 +2493,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement String snapshotName = null; String sysCatalogTableName = null; SQLException toThrow = null; + boolean acquiredMutexLock = false; + byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); try { if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) { throw new UpgradeNotRequiredException(); @@ -2497,8 +2518,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); sysCatalogTableName = e.getTable().getPhysicalName().getString(); if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP - && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable() - .getPhysicalName().getBytes())) { + && (acquiredMutexLock = acquireUpgradeMutex(currentServerSideTableTimeStamp, mutexRowKey))) { snapshotName = getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp); createSnapshot(snapshotName, sysCatalogTableName); @@ -2800,6 +2820,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else { toThrow = e; } + } finally { + if (acquiredMutexLock) { + releaseUpgradeMutex(mutexRowKey); + } } if (toThrow != null) { throw toThrow; } } @@ -2948,9 +2972,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by * making use of HBase's checkAndPut api. * <p> - * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the - * cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's - * upgrading to a release newer than 4.8.1 the existing cell value will be non-null. The client which + * This method was added as part of 4.9.0 release. For clients upgrading to 4.9.0, the old value in the + * cell will be null i.e. the {@value #UPGRADE_MUTEX} column will be non-existent. For client's + * upgrading to a release newer than 4.9.0 the existing cell value will be non-null. The client which * wins the race will end up setting the cell value to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP} * for the release. * </p> @@ -2960,27 +2984,53 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * @throws SQLException */ @VisibleForTesting - public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException, + public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] rowToLock) throws IOException, SQLException { Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP); - try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) { - byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); - byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; - byte[] qualifier = QueryConstants.UPGRADE_MUTEX; - byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null + try (HBaseAdmin admin = getAdmin()) { + try { + HTableDescriptor tableDesc = new HTableDescriptor( + TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)); + HColumnDescriptor columnDesc = new HColumnDescriptor( + PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES); + columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after some time + tableDesc.addFamily(columnDesc); + admin.createTable(tableDesc); + } catch (TableExistsException e) { + // Ignore + } + } + try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) { + byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; + byte[] qualifier = UPGRADE_MUTEX; + byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0 ? null : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp); byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP); - // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used - // to calculate SYSTEM.CATALOG's server side timestamp. - Put put = new Put(row); + Put put = new Put(rowToLock); put.add(family, qualifier, newValue); - boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put); - if (!acquired) { throw new UpgradeInProgressException( - getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); } + boolean acquired = sysMutexTable.checkAndPut(rowToLock, family, qualifier, oldValue, put); + if (!acquired) { throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp), + getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); } return true; } } + + @VisibleForTesting + public boolean releaseUpgradeMutex(byte[] mutexRowKey) { + boolean released = false; + try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) { + byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; + byte[] qualifier = UPGRADE_MUTEX; + byte[] expectedValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP); + Delete delete = new Delete(mutexRowKey); + RowMutations mutations = new RowMutations(mutexRowKey); + mutations.add(delete); + released = sysMutexTable.checkAndMutate(mutexRowKey, family, qualifier, CompareOp.EQUAL, expectedValue, mutations); + } catch (Exception e) { + logger.warn("Release of upgrade mutex failed", e); + } + return released; + } private List<String> getTableNames(List<HTableDescriptor> tables) { List<String> tableNames = new ArrayList<String>(4); http://git-wip-us.apache.org/repos/asf/phoenix/blob/030fb768/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 89f7aba..8e2dc1a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -363,6 +363,5 @@ public interface QueryConstants { public static final byte[] OFFSET_FAMILY = "f_offset".getBytes(); public static final byte[] OFFSET_COLUMN = "c_offset".getBytes(); public static final String LAST_SCAN = "LAST_SCAN"; - public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); }