PHOENIX-3230 Upgrade code running concurrently on different JVMs could make 
clients unusuable


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/df0d6117
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/df0d6117
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/df0d6117

Branch: refs/heads/calcite
Commit: df0d61179a97881b8b72595af198dbb346adfb9f
Parents: 36d500c
Author: Samarth <samarth.j...@salesforce.com>
Authored: Thu Sep 8 20:20:59 2016 -0700
Committer: Samarth <samarth.j...@salesforce.com>
Committed: Thu Sep 8 20:20:59 2016 -0700

----------------------------------------------------------------------
 .../phoenix/coprocessor/MetaDataProtocol.java   |  20 +-
 .../phoenix/exception/SQLExceptionCode.java     |   1 +
 .../query/ConnectionQueryServicesImpl.java      | 244 +++++++++++--------
 .../apache/phoenix/query/QueryConstants.java    |   1 +
 .../org/apache/phoenix/util/UpgradeUtil.java    |   5 +-
 5 files changed, 164 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index dce89bd..20922e5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -19,9 +19,9 @@ package org.apache.phoenix.coprocessor;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
@@ -29,7 +29,6 @@ import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.schema.PColumn;
@@ -64,7 +63,7 @@ import com.google.protobuf.ByteString;
 public abstract class MetaDataProtocol extends MetaDataService {
     public static final int PHOENIX_MAJOR_VERSION = 4;
     public static final int PHOENIX_MINOR_VERSION = 8;
-    public static final int PHOENIX_PATCH_NUMBER = 0;
+    public static final int PHOENIX_PATCH_NUMBER = 1;
     public static final int PHOENIX_VERSION =
             VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, 
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
 
@@ -89,7 +88,7 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
     
     // ALWAYS update this map whenever rolling out a new release (major, minor 
or patch release). 
     // Key is the SYSTEM.CATALOG timestamp for the version and value is the 
version string.
-    public static final Map<Long, String> TIMESTAMP_VERSION_MAP = new 
HashMap<>(10);
+    private static final NavigableMap<Long, String> TIMESTAMP_VERSION_MAP = 
new TreeMap<>();
     static {
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0, "4.1.x");
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0, "4.2.0");
@@ -100,6 +99,7 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, "4.7.x");
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, "4.8.x");
     }
+    
     public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION 
+ "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; 
     
     // TODO: pare this down to minimum, as we don't need duplicates for both 
table and column errors, nor should we need
@@ -401,4 +401,14 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
             return schema;
         }
     }
+  
+    public static String getVersion(long serverTimestamp) {
+        /*
+         * It is possible that when clients are trying to run upgrades 
concurrently, we could be at an intermediate
+         * server timestamp. Using floorKey provides us a range based lookup 
where the timestamp range for a release is
+         * [timeStampForRelease, timestampForNextRelease).
+         */
+        String version = 
TIMESTAMP_VERSION_MAP.get(TIMESTAMP_VERSION_MAP.floorKey(serverTimestamp));
+        return version;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5a8fffa..0ccecae 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -370,6 +370,7 @@ public enum SQLExceptionCode {
     OUTDATED_JARS(2007, "INT09", "Outdated jars."),
     INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index 
metadata. "),
     UNKNOWN_ERROR_CODE(2009, "INT11", "Unknown error code."),
+    CONCURRENT_UPGRADE_IN_PROGRESS(2010, "INT12", ""),
     OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out.", new Factory() {
         @Override
         public SQLException newException(SQLExceptionInfo info) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 86217e7..c5d53c3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -19,13 +19,12 @@ package org.apache.phoenix.query;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
-import static org.apache.phoenix.util.UpgradeUtil.getUpgradeSnapshotName;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_SEQUENCE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
@@ -80,6 +79,7 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
@@ -88,9 +88,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
-import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -210,6 +207,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
@@ -2331,140 +2329,141 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 if (upgradeSystemTables) {
                                     long currentServerSideTableTimeStamp = 
e.getTable().getTimeStamp();
                                     sysCatalogTableName = 
e.getTable().getPhysicalName().getString();
-                                    if (currentServerSideTableTimeStamp < 
MIN_SYSTEM_TABLE_TIMESTAMP) {
+                                    if (currentServerSideTableTimeStamp < 
MIN_SYSTEM_TABLE_TIMESTAMP && 
acquireUpgradeMutex(currentServerSideTableTimeStamp)) {
                                         snapshotName = 
getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp);
                                         createSnapshot(snapshotName, 
sysCatalogTableName);
                                     }
-                                        String columnsToAdd = "";
-                                        // This will occur if we have an older 
SYSTEM.CATALOG and we need to update it to include
-                                        // any new columns we've added.
-                                        if(currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
-                                            // We know that we always need to 
add the STORE_NULLS column for 4.3 release
-                                            columnsToAdd = 
addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + 
PBoolean.INSTANCE.getSqlTypeName());
-                                            try (HBaseAdmin admin = 
getAdmin()) {
-                                                HTableDescriptor[] 
localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
-                                                for (HTableDescriptor table : 
localIndexTables) {
-                                                    if 
(table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
-                                                            && 
table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
-                                                        
table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
+                                    String columnsToAdd = "";
+                                    // This will occur if we have an older 
SYSTEM.CATALOG and we need to update it to include
+                                    // any new columns we've added.
+                                    if(currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
+                                        // We know that we always need to add 
the STORE_NULLS column for 4.3 release
+                                        columnsToAdd = addColumn(columnsToAdd, 
PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName());
+                                        try (HBaseAdmin admin = getAdmin()) {
+                                            HTableDescriptor[] 
localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
+                                            for (HTableDescriptor table : 
localIndexTables) {
+                                                if 
(table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
+                                                        && 
table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
+                                                    
table.setValue(MetaDataUtil.PARENT_TABLE_KEY,
                                                             
MetaDataUtil.getUserTableName(table
-                                                                
.getNameAsString()));
-                                                        // Explicitly disable, 
modify and enable the table to ensure co-location of data
-                                                        // and index regions. 
If we just modify the table descriptor when online schema
-                                                        // change enabled may 
reopen the region in same region server instead of following data region.
-                                                        
admin.disableTable(table.getTableName());
-                                                        
admin.modifyTable(table.getTableName(), table);
-                                                        
admin.enableTable(table.getTableName());
-                                                    }
+                                                                    
.getNameAsString()));
+                                                    // Explicitly disable, 
modify and enable the table to ensure co-location of data
+                                                    // and index regions. If 
we just modify the table descriptor when online schema
+                                                    // change enabled may 
reopen the region in same region server instead of following data region.
+                                                    
admin.disableTable(table.getTableName());
+                                                    
admin.modifyTable(table.getTableName(), table);
+                                                    
admin.enableTable(table.getTableName());
                                                 }
                                             }
                                         }
+                                    }
 
-                                        // If the server side schema is before 
MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
-                                        // we need to add INDEX_TYPE and 
INDEX_DISABLE_TIMESTAMP columns too. 
-                                        // TODO: Once 
https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, 
-                                        // we should just have a ALTER TABLE 
ADD IF NOT EXISTS statement with all 
-                                        // the column names that have been 
added to SYSTEM.CATALOG since 4.0. 
-                                        if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
-                                            columnsToAdd = 
addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + 
PUnsignedTinyint.INSTANCE.getSqlTypeName()
+                                    // If the server side schema is before 
MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then
+                                    // we need to add INDEX_TYPE and 
INDEX_DISABLE_TIMESTAMP columns too. 
+                                    // TODO: Once 
https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, 
+                                    // we should just have a ALTER TABLE ADD 
IF NOT EXISTS statement with all 
+                                    // the column names that have been added 
to SYSTEM.CATALOG since 4.0. 
+                                    if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
+                                        columnsToAdd = addColumn(columnsToAdd, 
PhoenixDatabaseMetaData.INDEX_TYPE + " " + 
PUnsignedTinyint.INSTANCE.getSqlTypeName()
                                                 + ", " + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + 
PLong.INSTANCE.getSqlTypeName());
-                                        }
+                                    }
 
-                                        // If we have some new columns from 
4.1-4.3 to add, add them now.
-                                        if (!columnsToAdd.isEmpty()) {
-                                            // Ugh..need to assign to another 
local variable to keep eclipse happy.
-                                            PhoenixConnection 
newMetaConnection = addColumnsIfNotExists(metaConnection,
+                                    // If we have some new columns from 
4.1-4.3 to add, add them now.
+                                    if (!columnsToAdd.isEmpty()) {
+                                        // Ugh..need to assign to another 
local variable to keep eclipse happy.
+                                        PhoenixConnection newMetaConnection = 
addColumnsIfNotExists(metaConnection,
                                                 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd);
-                                            metaConnection = newMetaConnection;
-                                        }
+                                        metaConnection = newMetaConnection;
+                                    }
 
-                                        if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
-                                            columnsToAdd = 
PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
-                                                    + 
PInteger.INSTANCE.getSqlTypeName();
-                                            try {
-                                                metaConnection = 
addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                    if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
+                                        columnsToAdd = 
PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
+                                                + 
PInteger.INSTANCE.getSqlTypeName();
+                                        try {
+                                            metaConnection = 
addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                                     
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false);
-                                                upgradeTo4_5_0(metaConnection);
-                                            } catch 
(ColumnAlreadyExistsException ignored) {
-                                                /* 
-                                                 * Upgrade to 4.5 is a 
slightly special case. We use the fact that the column
-                                                 * BASE_COLUMN_COUNT is 
already part of the meta-data schema as the signal that
-                                                 * the server side upgrade has 
finished or is in progress.
-                                                 */
-                                                logger.debug("No need to run 
4.5 upgrade");
+                                            upgradeTo4_5_0(metaConnection);
+                                        } catch (ColumnAlreadyExistsException 
ignored) {
+                                            /* 
+                                             * Upgrade to 4.5 is a slightly 
special case. We use the fact that the column
+                                             * BASE_COLUMN_COUNT is already 
part of the meta-data schema as the signal that
+                                             * the server side upgrade has 
finished or is in progress.
+                                             */
+                                            logger.debug("No need to run 4.5 
upgrade");
+                                        }
+                                        Properties props = 
PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+                                        
props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+                                        
props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+                                        PhoenixConnection conn = new 
PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), 
props, metaConnection.getMetaDataCache());
+                                        try {
+                                            List<String> tablesNeedingUpgrade 
= UpgradeUtil.getPhysicalTablesWithDescRowKey(conn);
+                                            if 
(!tablesNeedingUpgrade.isEmpty()) {
+                                                logger.warn("The following 
tables require upgrade due to a bug causing the row key to be incorrect for 
descending columns and ascending BINARY columns (PHOENIX-2067 and 
PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade 
issue the \"bin/psql.py -u\" command.");
                                             }
-                                            Properties props = 
PropertiesUtil.deepCopy(metaConnection.getClientInfo());
-                                            
props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
-                                            
props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
-                                            PhoenixConnection conn = new 
PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), 
props, metaConnection.getMetaDataCache());
-                                            try {
-                                                List<String> 
tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn);
-                                                if 
(!tablesNeedingUpgrade.isEmpty()) {
-                                                    logger.warn("The following 
tables require upgrade due to a bug causing the row key to be incorrect for 
descending columns and ascending BINARY columns (PHOENIX-2067 and 
PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade 
issue the \"bin/psql.py -u\" command.");
-                                                }
-                                                List<String> unsupportedTables 
= UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn);
-                                                if 
(!unsupportedTables.isEmpty()) {
-                                                    logger.warn("The following 
tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + 
Joiner.on(' ').join(unsupportedTables));
-                                                }
-                                            } catch (Exception ex) {
-                                                logger.error("Unable to 
determine tables requiring upgrade due to PHOENIX-2067", ex);
-                                            } finally {
-                                                conn.close();
+                                            List<String> unsupportedTables = 
UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn);
+                                            if (!unsupportedTables.isEmpty()) {
+                                                logger.warn("The following 
tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + 
Joiner.on(' ').join(unsupportedTables));
                                             }
+                                        } catch (Exception ex) {
+                                            logger.error("Unable to determine 
tables requiring upgrade due to PHOENIX-2067", ex);
+                                        } finally {
+                                            conn.close();
                                         }
-                                        // Add these columns one at a time, 
each with different timestamps so that if folks have
-                                        // run the upgrade code already for a 
snapshot, we'll still enter this block (and do the
-                                        // parts we haven't yet done).
-                                        if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
-                                            columnsToAdd = 
PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + 
PBoolean.INSTANCE.getSqlTypeName();
-                                            metaConnection = 
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                    }
+                                    // Add these columns one at a time, each 
with different timestamps so that if folks have
+                                    // run the upgrade code already for a 
snapshot, we'll still enter this block (and do the
+                                    // parts we haven't yet done).
+                                    if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) {
+                                        columnsToAdd = 
PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + 
PBoolean.INSTANCE.getSqlTypeName();
+                                        metaConnection = 
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd);
-                                        }
-                                        if(currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
-                                            // Drop old stats table so that 
new stats table is created
-                                            metaConnection = 
dropStatsTable(metaConnection, 
+                                    }
+                                    if(currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
+                                        // Drop old stats table so that new 
stats table is created
+                                        metaConnection = 
dropStatsTable(metaConnection, 
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4);
-                                            metaConnection = 
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
+                                        metaConnection = 
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3,
                                                 
PhoenixDatabaseMetaData.TRANSACTIONAL + " " + 
PBoolean.INSTANCE.getSqlTypeName());
-                                            metaConnection = 
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
+                                        metaConnection = 
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2,
                                                 
PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + 
PLong.INSTANCE.getSqlTypeName());
-                                            metaConnection = 
setImmutableTableIndexesImmutable(metaConnection, 
+                                        metaConnection = 
setImmutableTableIndexesImmutable(metaConnection, 
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1);
-                                            metaConnection = 
updateSystemCatalogTimestamp(metaConnection, 
+                                        metaConnection = 
updateSystemCatalogTimestamp(metaConnection, 
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
-                                            
ConnectionQueryServicesImpl.this.removeTable(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
-                                            clearCache();
-                                        }
+                                        
ConnectionQueryServicesImpl.this.removeTable(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0);
+                                        clearCache();
+                                    }
 
-                                        if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
-                                            metaConnection = 
addColumnsIfNotExists(metaConnection,
+                                    if (currentServerSideTableTimeStamp < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
+                                        metaConnection = 
addColumnsIfNotExists(metaConnection,
                                                 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
                                                 
PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " "
                                                         + 
PBoolean.INSTANCE.getSqlTypeName());
-                                            metaConnection = 
addColumnsIfNotExists(metaConnection,
+                                        metaConnection = 
addColumnsIfNotExists(metaConnection,
                                                 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1,
                                                 
PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " "
                                                         + 
PVarchar.INSTANCE.getSqlTypeName());
-                                            metaConnection = 
addColumnsIfNotExists(metaConnection,
+                                        metaConnection = 
addColumnsIfNotExists(metaConnection,
                                                 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0,
                                                 
PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " "
                                                         + 
PBoolean.INSTANCE.getSqlTypeName());
-                                            metaConnection = 
UpgradeUtil.disableViewIndexes(metaConnection);
-                                            
if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
+                                        metaConnection = 
UpgradeUtil.disableViewIndexes(metaConnection);
+                                        
if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB,
                                                 
QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) {
-                                                metaConnection = 
UpgradeUtil.upgradeLocalIndexes(metaConnection);
-                                            }
-                                            
ConnectionQueryServicesImpl.this.removeTable(null,
+                                            metaConnection = 
UpgradeUtil.upgradeLocalIndexes(metaConnection);
+                                        }
+                                        
ConnectionQueryServicesImpl.this.removeTable(null,
                                                 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
                                                 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0);
-                                            clearCache();
-                                        }
+                                        clearCache();
+                                    }
+
                                 }
                             }
 
@@ -2544,6 +2543,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             }
                             success = true;
                             scheduleRenewLeaseTasks();
+                        } catch (UpgradeInProgressException e) {
+                            // don't set it as initializationException because 
otherwise client won't be able to retry
+                            throw e;
                         } catch (Exception e) {
                             if (e instanceof SQLException) {
                                 initializationException = (SQLException)e;
@@ -2719,12 +2721,56 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                         }
                     }
                 }
+                
+                /**
+                 * Acquire distributed mutex of sorts to make sure only one 
JVM is able to run the upgrade code by
+                 * making use of HBase's checkAndPut api.
+                 * <p>
+                 * This method was added as part of 4.8.1 release. For clients 
upgrading to 4.8.1, the old value in the
+                 * version cell will be null i.e. the QueryConstants#VERSION 
column will be non-existent. For client's
+                 * upgrading to a release newer than 4.8.1 the existing 
version cell will be non-null. The client which
+                 * wins the race will end up setting the version cell to the 
MetadataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP
+                 * for the release.
+                 * </p>
+                 * 
+                 * @return true if client won the race, false otherwise
+                 * @throws IOException
+                 * @throws SQLException
+                 */
+                private boolean acquireUpgradeMutex(long 
currentServerSideTableTimestamp) throws IOException,
+                        SQLException {
+                    
Preconditions.checkArgument(currentServerSideTableTimestamp < 
MIN_SYSTEM_TABLE_TIMESTAMP);
+                    try (HTableInterface sysCatalogTable = 
getTable(SYSTEM_CATALOG_NAME_BYTES)) {
+                        byte[] row = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+                        byte[] family = 
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+                        byte[] qualifier = QueryConstants.VERSION;
+                        byte[] oldValue = currentServerSideTableTimestamp < 
MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 ? null
+                                : 
Bytes.toBytes(currentServerSideTableTimestamp);
+                        byte[] newValue = 
Bytes.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP);
+                        long ts = MIN_SYSTEM_TABLE_TIMESTAMP;
+                        Put put = new Put(row, ts);
+                        put.add(family, qualifier, newValue);
+                        boolean acquired = sysCatalogTable.checkAndPut(row, 
family, qualifier, oldValue, put);
+                        if (!acquired) { throw new UpgradeInProgressException(
+                                getVersion(currentServerSideTableTimestamp), 
getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); }
+                        return true;
+                    }
+                }
             });
         } catch (Exception e) {
             Throwables.propagateIfInstanceOf(e, SQLException.class);
             throw Throwables.propagate(e);
         }
     }
+    
+    private static class UpgradeInProgressException extends SQLException {
+        public UpgradeInProgressException(String upgradeFrom, String 
upgradeTo) {
+            super("Cluster is being concurrently upgraded from " + upgradeFrom 
+ " to " + upgradeTo
+                    + ". Please retry establishing connection.", 
SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS
+                    .getSQLState(), 
SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode());
+        }
+    }
 
     private List<String> getTableNames(List<HTableDescriptor> tables) {
         List<String> tableNames = new ArrayList<String>(4);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 9f8f58c..3077943 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -356,5 +356,6 @@ public interface QueryConstants {
     public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
     public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
     public static final String LAST_SCAN = "LAST_SCAN";
+    public static final byte[] VERSION = "VERSION".getBytes();
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 7c34f4a..8bc3e63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.util;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.CURRENT_CLIENT_VERSION;
-import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.TIMESTAMP_VERSION_MAP;
+import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
@@ -31,7 +31,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
@@ -1896,7 +1895,7 @@ public class UpgradeUtil {
     public static final String getUpgradeSnapshotName(String tableString, long 
currentSystemTableTimestamp) {
         Format formatter = new SimpleDateFormat("yyyyMMddHHmmssZ");
         String date = formatter.format(new Date(System.currentTimeMillis()));
-        String upgradingFrom = 
TIMESTAMP_VERSION_MAP.get(currentSystemTableTimestamp);
+        String upgradingFrom = getVersion(currentSystemTableTimestamp);
         return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + 
CURRENT_CLIENT_VERSION + "_" + date;
     }
 }
\ No newline at end of file

Reply via email to