http://git-wip-us.apache.org/repos/asf/phoenix/blob/87fdda8b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 914f62a..8b328d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -170,6 +170,7 @@ import org.apache.phoenix.exception.RetriableUpgradeException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.exception.UpgradeInProgressException; +import org.apache.phoenix.exception.UpgradeRequiredException; import org.apache.phoenix.exception.UpgradeNotRequiredException; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy; @@ -1041,23 +1042,69 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement /** * - * @param tableName + * @param physicalTableName + * @param tableType + * @param props + * @param families * @param splits - * @param modifyExistingMetaData TODO + * @param modifyExistingMetaData + * @param isNamespaceMapped + * @param isDoNotUpgradePropSet * @return true if table was created and false if it already exists * @throws SQLException */ private HTableDescriptor ensureTableCreated(byte[] physicalTableName, PTableType tableType, Map<String, Object> props, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean modifyExistingMetaData, - boolean isNamespaceMapped) throws SQLException { + boolean isNamespaceMapped, boolean isDoNotUpgradePropSet) throws SQLException { SQLException sqlE = null; HTableDescriptor existingDesc = null; boolean isMetaTable = SchemaUtil.isMetaTable(physicalTableName); boolean tableExist = true; try (HBaseAdmin admin = getAdmin()) { final String quorum = ZKConfig.getZKQuorumServersString(config); - final String znode = this.props.get(HConstants.ZOOKEEPER_ZNODE_PARENT); + final String znode = this.getProps().get(HConstants.ZOOKEEPER_ZNODE_PARENT); logger.debug("Found quorum: " + quorum + ":" + znode); + + if (isMetaTable) { + if(SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, this.getProps())) { + try { + // SYSTEM namespace needs to be created via HBase APIs because "CREATE SCHEMA" statement tries to write + // its metadata in SYSTEM:CATALOG table. Without SYSTEM namespace, SYSTEM:CATALOG table cannot be created + ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME); + } catch (PhoenixIOException e) { + // We could either: + // 1) Not access the NS descriptor. The NS may or may not exist at this point + // 2) We could not create the NS + // Regardless of the case 1 or 2, if we eventually try to migrate SYSTEM tables to the SYSTEM + // namespace using the {@link ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps)} method, + // if the NS does not exist, we will error as expected, or + // if the NS does exist and tables are already mapped, the check will exit gracefully + } + if (admin.tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, false))) { + // SYSTEM.CATALOG exists, so at this point, we have 3 cases: + // 1) If server-side namespace mapping is disabled, throw Inconsistent namespace mapping exception + // 2) If server-side namespace mapping is enabled and SYSCAT needs to be upgraded, upgrade SYSCAT + // and also migrate SYSTEM tables to the SYSTEM namespace + // 3. If server-side namespace mapping is enabled and SYSCAT doesn't need to be upgraded, we still + // need to migrate SYSTEM tables to the SYSTEM namespace using the + // {@link ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps)} method (as part of + // {@link upgradeSystemTables(String, Properties)}) + checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES); + // Thrown so we can force an upgrade which will just migrate SYSTEM tables to the SYSTEM namespace + throw new UpgradeRequiredException(MIN_SYSTEM_TABLE_TIMESTAMP); + } + } else if (admin.tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true))) { + // If SYSTEM:CATALOG exists, but client-side namespace mapping for SYSTEM tables is disabled, throw an exception + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.INCONSISTENT_NAMESPACE_MAPPING_PROPERTIES) + .setMessage("Cannot initiate connection as " + + SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true) + + " is found but client does not have " + + IS_NAMESPACE_MAPPING_ENABLED + " enabled") + .build().buildException(); + } + } + try { existingDesc = admin.getTableDescriptor(physicalTableName); } catch (org.apache.hadoop.hbase.TableNotFoundException e) { @@ -1075,6 +1122,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement splits, isNamespaceMapped); if (!tableExist) { + if (isMetaTable && !isUpgradeRequired() && (!isAutoUpgradeEnabled || isDoNotUpgradePropSet)) { + // Disallow creating the SYSTEM.CATALOG or SYSTEM:CATALOG HBase table + throw new UpgradeRequiredException(); + } if (newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null && Boolean.TRUE.equals( PBoolean.INSTANCE.toObject(newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName()); @@ -1092,9 +1143,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (TableExistsException e) { // We can ignore this, as it just means that another client beat us // to creating the HBase metadata. + if (isMetaTable && !isUpgradeRequired()) { + checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName()); + } return null; } - if (isMetaTable) { + if (isMetaTable && !isUpgradeRequired()) { checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName()); /* * Now we modify the table to add the split policy, since we know that the client and @@ -1106,7 +1160,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return null; } else { - if (isMetaTable) { + if (isMetaTable && !isUpgradeRequired()) { checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName()); } else { for(Pair<byte[],Map<String,Object>> family: families) { @@ -1120,7 +1174,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } - if (!modifyExistingMetaData) { return existingDesc; // Caller already knows that no metadata was changed } @@ -1143,7 +1196,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return null; // Indicate that no metadata was changed } - modifyTable(physicalTableName, newDesc, true); + // Do not call modifyTable for SYSTEM tables + if (tableType != PTableType.SYSTEM) { + modifyTable(physicalTableName, newDesc, true); + } return newDesc; } @@ -1198,6 +1254,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement boolean isIncompatible = false; int minHBaseVersion = Integer.MAX_VALUE; boolean isTableNamespaceMappingEnabled = false; + long systemCatalogTimestamp = Long.MAX_VALUE; HTableInterface ht = null; try { List<HRegionLocation> locations = this @@ -1214,36 +1271,44 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } ht = this.getTable(metaTable); - final Map<byte[], Long> results = - ht.coprocessorService(MetaDataService.class, null, null, new Batch.Call<MetaDataService,Long>() { + final Map<byte[], GetVersionResponse> results = + ht.coprocessorService(MetaDataService.class, null, null, new Batch.Call<MetaDataService,GetVersionResponse>() { @Override - public Long call(MetaDataService instance) throws IOException { + public GetVersionResponse call(MetaDataService instance) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<GetVersionResponse> rpcCallback = - new BlockingRpcCallback<GetVersionResponse>(); + new BlockingRpcCallback<>(); GetVersionRequest.Builder builder = GetVersionRequest.newBuilder(); builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.getVersion(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); } - return rpcCallback.get().getVersion(); + return rpcCallback.get(); } }); - for (Map.Entry<byte[],Long> result : results.entrySet()) { + for (Map.Entry<byte[],GetVersionResponse> result : results.entrySet()) { // This is the "phoenix.jar" is in-place, but server is out-of-sync with client case. - long version = result.getValue(); - isTableNamespaceMappingEnabled |= MetaDataUtil.decodeTableNamespaceMappingEnabled(version); + GetVersionResponse versionResponse = result.getValue(); + long serverJarVersion = versionResponse.getVersion(); + isTableNamespaceMappingEnabled |= MetaDataUtil.decodeTableNamespaceMappingEnabled(serverJarVersion); - if (!isCompatible(result.getValue())) { + if (!isCompatible(serverJarVersion)) { isIncompatible = true; HRegionLocation name = regionMap.get(result.getKey()); buf.append(name); buf.append(';'); } - hasIndexWALCodec &= hasIndexWALCodec(result.getValue()); - if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(result.getValue())) { - minHBaseVersion = MetaDataUtil.decodeHBaseVersion(result.getValue()); + hasIndexWALCodec &= hasIndexWALCodec(serverJarVersion); + if (minHBaseVersion > MetaDataUtil.decodeHBaseVersion(serverJarVersion)) { + minHBaseVersion = MetaDataUtil.decodeHBaseVersion(serverJarVersion); + } + // In case this is the first time connecting to this cluster, the system catalog table does not have an + // entry for itself yet, so we cannot get the timestamp and this will not be returned from the + // GetVersionResponse message object + if (versionResponse.hasSystemCatalogTimestamp()) { + systemCatalogTimestamp = systemCatalogTimestamp < versionResponse.getSystemCatalogTimestamp() ? + systemCatalogTimestamp: versionResponse.getSystemCatalogTimestamp(); } } if (isTableNamespaceMappingEnabled != SchemaUtil.isNamespaceMappingEnabled(PTableType.TABLE, @@ -1274,6 +1339,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement buf.setLength(buf.length()-1); throw new SQLExceptionInfo.Builder(SQLExceptionCode.OUTDATED_JARS).setMessage(buf.toString()).build().buildException(); } + if (systemCatalogTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP) { + throw new UpgradeRequiredException(systemCatalogTimestamp); + } } /** @@ -1334,14 +1402,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement tableProps.put(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING); HTableDescriptor desc = ensureTableCreated(physicalIndexName, PTableType.TABLE, tableProps, families, splits, - false, isNamespaceMapped); + false, isNamespaceMapped, false); if (desc != null) { if (!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) { String fullTableName = Bytes.toString(physicalIndexName); throw new TableAlreadyExistsException( - "Unable to create shared physical table for indexes on views.", - SchemaUtil.getSchemaNameFromFullName(fullTableName), - SchemaUtil.getTableNameFromFullName(fullTableName)); + SchemaUtil.getSchemaNameFromFullName(fullTableName), + SchemaUtil.getTableNameFromFullName(fullTableName), + "Unable to create shared physical table for indexes on views."); } } } @@ -1409,8 +1477,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, final byte[] physicalTableName, PTableType tableType, Map<String, Object> tableProps, - final List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped, final boolean allocateIndexId) - throws SQLException { + final List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped, + final boolean allocateIndexId, final boolean isDoNotUpgradePropSet) throws SQLException { byte[][] rowKeyMetadata = new byte[3][]; Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetaData); byte[] key = m.getRow(); @@ -1430,7 +1498,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) { // For views this will ensure that metadata already exists // For tables and indexes, this will create the metadata if it doesn't already exist - ensureTableCreated(tableName, tableType, tableProps, families, splits, true, isNamespaceMapped); + ensureTableCreated(tableName, tableType, tableProps, families, splits, true, isNamespaceMapped, isDoNotUpgradePropSet); } ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (tableType == PTableType.INDEX) { // Index on view @@ -2436,30 +2504,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement openConnection(); hConnectionEstablished = true; boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props); - try (HBaseAdmin admin = getAdmin()) { - boolean mappedSystemCatalogExists = admin - .tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true)); - if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, - ConnectionQueryServicesImpl.this.getProps())) { - if (admin.tableExists(SYSTEM_CATALOG_NAME_BYTES)) { - //check if the server is already updated and have namespace config properly set. - checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES); - } - - // If SYSTEM tables exist, they are migrated to HBase SYSTEM namespace - // If they don't exist, this method will create HBase SYSTEM namespace and return - ensureSystemTablesMigratedToSystemNamespace(ConnectionQueryServicesImpl.this.getProps()); - } else if (mappedSystemCatalogExists) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.INCONSISTENT_NAMESPACE_MAPPING_PROPERTIES) - .setMessage("Cannot initiate connection as " - + SchemaUtil.getPhysicalTableName( - SYSTEM_CATALOG_NAME_BYTES, true) - + " is found but client does not have " - + IS_NAMESPACE_MAPPING_ENABLED + " enabled") - .build().buildException(); - } - } Properties scnProps = PropertiesUtil.deepCopy(props); scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(getSystemTableVersion())); @@ -2508,38 +2552,37 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement initializationException = e; } return null; + } catch (UpgradeRequiredException e) { + // This will occur in 3 cases: + // 1. SYSTEM.CATALOG does not exist and we don't want to allow the user to create it i.e. + // !isAutoUpgradeEnabled or isDoNotUpgradePropSet is set + // 2. SYSTEM.CATALOG exists and its timestamp < MIN_SYSTEM_TABLE_TIMESTAMP + // 3. SYSTEM.CATALOG exists, but client and server-side namespace mapping is enabled so + // we need to migrate SYSTEM tables to the SYSTEM namespace + setUpgradeRequired(); } - // HBase Namespace SYSTEM is created by {@link ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps)} method - // This statement will create its entry in SYSCAT table, so that GRANT/REVOKE commands can work - // with SYSTEM Namespace. (See PHOENIX-4227 https://issues.apache.org/jira/browse/PHOENIX-4227) - if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, - ConnectionQueryServicesImpl.this.getProps())) { - try { - metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " - + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA); - } catch (NewerSchemaAlreadyExistsException e) { - // Older clients with appropriate perms may try getting a new connection - // This results in NewerSchemaAlreadyExistsException, so we can safely ignore it here - } catch (PhoenixIOException e) { - if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) { - // Ignore ADE - } else { - throw e; - } - } - } if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) { createOtherSystemTables(metaConnection, hBaseAdmin); + // In case namespace mapping is enabled and system table to system namespace mapping is also enabled, + // create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work + // with SYSTEM Namespace + createSchemaIfNotExistsSystemNSMappingEnabled(metaConnection); } else if (isAutoUpgradeEnabled && !isDoNotUpgradePropSet) { + // Upgrade is required and we are allowed to automatically upgrade upgradeSystemTables(url, props); + } else { + // We expect the user to manually run the "EXECUTE UPGRADE" command first. + // This exception will get caught below as a RetriableUpgradeException + throw new UpgradeRequiredException(); } } scheduleRenewLeaseTasks(); success = true; } catch (RetriableUpgradeException e) { - // Don't set it as initializationException because otherwise the clien't won't be able - // to retry establishing connection. + // Set success to true and don't set the exception as an initializationException, + // because otherwise the client won't be able to retry establishing the connection. + success = true; throw e; } catch (Exception e) { if (e instanceof SQLException) { @@ -2580,7 +2623,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } - void createSysMutexTableIfNotExists(HBaseAdmin admin, ReadOnlyProps props) throws IOException, SQLException { + void createSysMutexTableIfNotExists(HBaseAdmin admin) throws IOException, SQLException { try { if(admin.tableExists(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME) || admin.tableExists(TableName.valueOf( PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME,PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME))) { @@ -2588,7 +2631,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return; } final TableName mutexTableName = SchemaUtil.getPhysicalTableName( - PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, props); + PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, this.getProps()); HTableDescriptor tableDesc = new HTableDescriptor(mutexTableName); HColumnDescriptor columnDesc = new HColumnDescriptor( PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES); @@ -2640,7 +2683,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (TableAlreadyExistsException ignore) {} // Catch the IOException to log the error message and then bubble it up for the client to retry. try { - createSysMutexTableIfNotExists(hbaseAdmin, ConnectionQueryServicesImpl.this.getProps()); + createSysMutexTableIfNotExists(hbaseAdmin); } catch (IOException exception) { logger.error("Failed to created SYSMUTEX table. Upgrade or migration is not possible without it. Please retry."); throw exception; @@ -2648,6 +2691,265 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } /** + * Create an entry for the SYSTEM namespace in the SYSCAT table in case namespace mapping is enabled and system table + * to system namespace mapping is also enabled. If not enabled, this method returns immediately without doing anything + * @param metaConnection + * @throws SQLException + */ + private void createSchemaIfNotExistsSystemNSMappingEnabled(PhoenixConnection metaConnection) throws SQLException { + // HBase Namespace SYSTEM is assumed to be already created inside {@link ensureTableCreated(byte[], PTableType, + // Map<String, Object>, List<Pair<byte[], Map<String, Object>>>, byte[][], boolean, boolean, boolean)}. + // This statement will create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE + // commands can work with SYSTEM Namespace. (See PHOENIX-4227 https://issues.apache.org/jira/browse/PHOENIX-4227) + if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, + ConnectionQueryServicesImpl.this.getProps())) { + try { + metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA); + } catch (NewerSchemaAlreadyExistsException e) { + // Older clients with appropriate perms may try getting a new connection + // This results in NewerSchemaAlreadyExistsException, so we can safely ignore it here + } catch (PhoenixIOException e) { + if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) { + // Ignore ADE + } else { + throw e; + } + } + } + } + + /** + * Upgrade the SYSCAT schema if required + * @param metaConnection + * @param currentServerSideTableTimeStamp + * @return Phoenix connection object + * @throws SQLException + * @throws IOException + * @throws TimeoutException + * @throws InterruptedException + */ + // Available for testing + protected PhoenixConnection upgradeSystemCatalogIfRequired(PhoenixConnection metaConnection, + long currentServerSideTableTimeStamp) throws SQLException, IOException, TimeoutException, InterruptedException { + String columnsToAdd = ""; + // This will occur if we have an older SYSTEM.CATALOG and we need to update it to + // include any new columns we've added. + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { + // We know that we always need to add the STORE_NULLS column for 4.3 release + columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + + " " + PBoolean.INSTANCE.getSqlTypeName()); + try (HBaseAdmin admin = getAdmin()) { + HTableDescriptor[] localIndexTables = admin + .listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + ".*"); + for (HTableDescriptor table : localIndexTables) { + if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null + && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) { + table.setValue(MetaDataUtil.PARENT_TABLE_KEY, + MetaDataUtil.getLocalIndexUserTableName(table.getNameAsString())); + // Explicitly disable, modify and enable the table to ensure + // co-location of data and index regions. If we just modify the + // table descriptor when online schema change enabled may reopen + // the region in same region server instead of following data region. + admin.disableTable(table.getTableName()); + admin.modifyTable(table.getTableName(), table); + admin.enableTable(table.getTableName()); + } + } + } + } + + // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then + // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. + // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, + // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all + // the column names that have been added to SYSTEM.CATALOG since 4.0. + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { + columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", " + + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + + PLong.INSTANCE.getSqlTypeName()); + } + + // If we have some new columns from 4.1-4.3 to add, add them now. + if (!columnsToAdd.isEmpty()) { + // Ugh..need to assign to another local variable to keep eclipse happy. + PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd); + metaConnection = newMetaConnection; + } + + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) { + columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " " + + PInteger.INSTANCE.getSqlTypeName(); + try { + metaConnection = addColumn(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, + false); + upgradeTo4_5_0(metaConnection); + } catch (ColumnAlreadyExistsException ignored) { + /* + * Upgrade to 4.5 is a slightly special case. We use the fact that the + * column BASE_COLUMN_COUNT is already part of the meta-data schema as the + * signal that the server side upgrade has finished or is in progress. + */ + logger.debug("No need to run 4.5 upgrade"); + } + Properties p = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); + p.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); + p.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + PhoenixConnection conn = new PhoenixConnection( + ConnectionQueryServicesImpl.this, metaConnection.getURL(), p, + metaConnection.getMetaDataCache()); + try { + List<String> tablesNeedingUpgrade = UpgradeUtil + .getPhysicalTablesWithDescRowKey(conn); + if (!tablesNeedingUpgrade.isEmpty()) { + logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + + Joiner.on(' ').join(tablesNeedingUpgrade) + + "\nTo upgrade issue the \"bin/psql.py -u\" command."); + } + List<String> unsupportedTables = UpgradeUtil + .getPhysicalTablesWithDescVarbinaryRowKey(conn); + if (!unsupportedTables.isEmpty()) { + logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + + Joiner.on(' ').join(unsupportedTables)); + } + } catch (Exception ex) { + logger.error( + "Unable to determine tables requiring upgrade due to PHOENIX-2067", + ex); + } finally { + conn.close(); + } + } + // Add these columns one at a time, each with different timestamps so that if folks + // have + // run the upgrade code already for a snapshot, we'll still enter this block (and do + // the + // parts we haven't yet done). + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) { + columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + + PBoolean.INSTANCE.getSqlTypeName(); + metaConnection = addColumnsIfNotExists(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd); + } + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { + // Drop old stats table so that new stats table is created + metaConnection = dropStatsTable(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3, + PhoenixDatabaseMetaData.TRANSACTIONAL + " " + + PBoolean.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2, + PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + + PLong.INSTANCE.getSqlTypeName()); + metaConnection = setImmutableTableIndexesImmutable(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1); + metaConnection = updateSystemCatalogTimestamp(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); + ConnectionQueryServicesImpl.this.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); + clearCache(); + } + + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) { + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2, + PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " " + + PBoolean.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1, + PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " " + + PVarchar.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, + PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " " + + PBoolean.INSTANCE.getSqlTypeName()); + metaConnection = UpgradeUtil.disableViewIndexes(metaConnection); + if (getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, + QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) { + metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection); + } + ConnectionQueryServicesImpl.this.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0); + clearCache(); + } + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0) { + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0, + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + " " + + PLong.INSTANCE.getSqlTypeName()); + ConnectionQueryServicesImpl.this.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0); + clearCache(); + } + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) { + metaConnection = addColumnQualifierColumn(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 3); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 2, + PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME + " " + + PTinyint.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 1, + PhoenixDatabaseMetaData.ENCODING_SCHEME + " " + + PTinyint.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0, + PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER + " " + + PInteger.INSTANCE.getSqlTypeName()); + ConnectionQueryServicesImpl.this.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0); + clearCache(); + } + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) { + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0, + PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION + " " + + PBoolean.INSTANCE.getSqlTypeName()); + addParentToChildLinks(metaConnection); + } + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) { + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0, + PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + " " + + PTinyint.INSTANCE.getSqlTypeName()); + } + return metaConnection; + } + + /** * There is no other locking needed here since only one connection (on the same or different JVM) will be able to * acquire the upgrade mutex via {@link #acquireUpgradeMutex(long, byte[])}. */ @@ -2674,244 +2976,54 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData()); metaConnection.setRunningUpgrade(true); + // Always try to create SYSTEM.MUTEX table since we need it to acquire the upgrade mutex. + // Upgrade or migration is not possible without the upgrade mutex + try (HBaseAdmin admin = getAdmin()) { + createSysMutexTableIfNotExists(admin); + } try { metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); } catch (NewerTableAlreadyExistsException ignore) { // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists // *after* this fixed timestamp. + } catch (UpgradeRequiredException e) { + // This is thrown while trying to create SYSTEM:CATALOG to indicate that we must migrate SYSTEM tables + // to the SYSTEM namespace and/or upgrade SYSCAT if required + sysCatalogTableName = SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getNameAsString(); + if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, ConnectionQueryServicesImpl.this.getProps())) { + // Try acquiring a lock in SYSMUTEX table before migrating the tables since it involves disabling the table. + if (acquiredMutexLock = acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, mutexRowKey)) { + logger.debug("Acquired lock in SYSMUTEX table for migrating SYSTEM tables to SYSTEM namespace " + + "and/or upgrading " + sysCatalogTableName); + } + // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException + + // If SYSTEM tables exist, they are migrated to HBase SYSTEM namespace + // If they don't exist or they're already migrated, this method will return immediately + ensureSystemTablesMigratedToSystemNamespace(); + logger.debug("Migrated SYSTEM tables to SYSTEM namespace"); + metaConnection = upgradeSystemCatalogIfRequired(metaConnection, e.getSystemCatalogTimeStamp()); + } } catch (TableAlreadyExistsException e) { long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); sysCatalogTableName = e.getTable().getPhysicalName().getString(); if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) { - if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) { - try (HBaseAdmin admin = getAdmin()) { - createSysMutexTableIfNotExists(admin, this.getProps()); - } - } + // Try acquiring a lock in SYSMUTEX table before upgrading SYSCAT. If we cannot acquire the lock, + // it means some old client is either migrating SYSTEM tables or trying to upgrade the schema of + // SYSCAT table and hence it should not be interrupted if (acquiredMutexLock = acquireUpgradeMutex(currentServerSideTableTimeStamp, mutexRowKey)) { + logger.debug("Acquired lock in SYSMUTEX table for upgrading " + sysCatalogTableName); snapshotName = getSysCatalogSnapshotName(currentServerSideTableTimeStamp); createSnapshot(snapshotName, sysCatalogTableName); snapshotCreated = true; + logger.debug("Created snapshot for SYSCAT"); } + // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException } - String columnsToAdd = ""; - // This will occur if we have an older SYSTEM.CATALOG and we need to update it to - // include any new columns we've added. - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { - // We know that we always need to add the STORE_NULLS column for 4.3 release - columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS - + " " + PBoolean.INSTANCE.getSqlTypeName()); - try (HBaseAdmin admin = getAdmin()) { - HTableDescriptor[] localIndexTables = admin - .listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + ".*"); - for (HTableDescriptor table : localIndexTables) { - if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null - && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) { - table.setValue(MetaDataUtil.PARENT_TABLE_KEY, - MetaDataUtil.getLocalIndexUserTableName(table.getNameAsString())); - // Explicitly disable, modify and enable the table to ensure - // co-location of data and index regions. If we just modify the - // table descriptor when online schema change enabled may reopen - // the region in same region server instead of following data region. - admin.disableTable(table.getTableName()); - admin.modifyTable(table.getTableName(), table); - admin.enableTable(table.getTableName()); - } - } - } - } - - // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then - // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. - // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, - // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all - // the column names that have been added to SYSTEM.CATALOG since 4.0. - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { - columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " - + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", " - + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " - + PLong.INSTANCE.getSqlTypeName()); - } - - // If we have some new columns from 4.1-4.3 to add, add them now. - if (!columnsToAdd.isEmpty()) { - // Ugh..need to assign to another local variable to keep eclipse happy. - PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd); - metaConnection = newMetaConnection; - } - - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) { - columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " " - + PInteger.INSTANCE.getSqlTypeName(); - try { - metaConnection = addColumn(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, - false); - upgradeTo4_5_0(metaConnection); - } catch (ColumnAlreadyExistsException ignored) { - /* - * Upgrade to 4.5 is a slightly special case. We use the fact that the - * column BASE_COLUMN_COUNT is already part of the meta-data schema as the - * signal that the server side upgrade has finished or is in progress. - */ - logger.debug("No need to run 4.5 upgrade"); - } - Properties p = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); - p.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); - p.remove(PhoenixRuntime.TENANT_ID_ATTRIB); - PhoenixConnection conn = new PhoenixConnection( - ConnectionQueryServicesImpl.this, metaConnection.getURL(), p, - metaConnection.getMetaDataCache()); - try { - List<String> tablesNeedingUpgrade = UpgradeUtil - .getPhysicalTablesWithDescRowKey(conn); - if (!tablesNeedingUpgrade.isEmpty()) { - logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" - + Joiner.on(' ').join(tablesNeedingUpgrade) - + "\nTo upgrade issue the \"bin/psql.py -u\" command."); - } - List<String> unsupportedTables = UpgradeUtil - .getPhysicalTablesWithDescVarbinaryRowKey(conn); - if (!unsupportedTables.isEmpty()) { - logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" - + Joiner.on(' ').join(unsupportedTables)); - } - } catch (Exception ex) { - logger.error( - "Unable to determine tables requiring upgrade due to PHOENIX-2067", - ex); - } finally { - conn.close(); - } - } - // Add these columns one at a time, each with different timestamps so that if folks - // have - // run the upgrade code already for a snapshot, we'll still enter this block (and do - // the - // parts we haven't yet done). - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) { - columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " - + PBoolean.INSTANCE.getSqlTypeName(); - metaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd); - } - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { - // Drop old stats table so that new stats table is created - metaConnection = dropStatsTable(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4); - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3, - PhoenixDatabaseMetaData.TRANSACTIONAL + " " - + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2, - PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " - + PLong.INSTANCE.getSqlTypeName()); - metaConnection = setImmutableTableIndexesImmutable(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1); - metaConnection = updateSystemCatalogTimestamp(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); - ConnectionQueryServicesImpl.this.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); - clearCache(); - } - - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) { - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2, - PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " " - + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1, - PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " " - + PVarchar.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, - PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " " - + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = UpgradeUtil.disableViewIndexes(metaConnection); - if (getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, - QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) { - metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection); - } - ConnectionQueryServicesImpl.this.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0); - clearCache(); - } - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0) { - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0, - PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + " " - + PLong.INSTANCE.getSqlTypeName()); - ConnectionQueryServicesImpl.this.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0); - clearCache(); - } - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) { - metaConnection = addColumnQualifierColumn(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 3); - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 2, - PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME + " " - + PTinyint.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 1, - PhoenixDatabaseMetaData.ENCODING_SCHEME + " " - + PTinyint.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0, - PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER + " " - + PInteger.INSTANCE.getSqlTypeName()); - ConnectionQueryServicesImpl.this.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0); - clearCache(); - } - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) { - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0, - PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION + " " - + PBoolean.INSTANCE.getSqlTypeName()); - addParentToChildLinks(metaConnection); - } - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) { - metaConnection = addColumnsIfNotExists( - metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0, - PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + " " - + PTinyint.INSTANCE.getSqlTypeName()); - } + metaConnection = upgradeSystemCatalogIfRequired(metaConnection, currentServerSideTableTimeStamp); } - int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt( QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); @@ -2997,6 +3109,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA); } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} + + // In case namespace mapping is enabled and system table to system namespace mapping is also enabled, + // create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work + // with SYSTEM Namespace + createSchemaIfNotExistsSystemNSMappingEnabled(metaConnection); + ConnectionQueryServicesImpl.this.upgradeRequired.set(false); success = true; } catch (UpgradeInProgressException | UpgradeNotRequiredException e) { @@ -3218,28 +3336,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } - void ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps props) + void ensureSystemTablesMigratedToSystemNamespace() throws SQLException, IOException, IllegalArgumentException, InterruptedException { - if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; } - - boolean acquiredMutexLock = false; - byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); + if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, this.getProps())) { return; } HTableInterface metatable = null; try (HBaseAdmin admin = getAdmin()) { - // SYSTEM namespace needs to be created via HBase API's because "CREATE SCHEMA" statement tries to write its metadata - // in SYSTEM:CATALOG table. Without SYSTEM namespace, SYSTEM:CATALOG table cannot be created. - try { - ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME); - } catch (PhoenixIOException e) { - // We could either: - // 1) Not access the NS descriptor. The NS may or may not exist at this point. - // 2) We could not create the NS - // Regardless of the case 1 or 2, if the NS does not exist, we will error expectedly - // below. If the NS does exist and is mapped, the below check will exit gracefully. - } - List<TableName> tableNames = getSystemTableNamesInDefaultNamespace(admin); // No tables exist matching "SYSTEM\..*", they are all already in "SYSTEM:.*" if (tableNames.size() == 0) { return; } @@ -3248,33 +3350,22 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement logger.warn("Expected 5 system tables but found " + tableNames.size() + ":" + tableNames); } - // Try acquiring a lock in SYSMUTEX table before migrating the tables since it involves disabling the table - // If we cannot acquire lock, it means some old client is either migrating SYSCAT or trying to upgrade the - // schema of SYSCAT table and hence it should not be interrupted - // Create mutex if not already created - createSysMutexTableIfNotExists(admin, props); - acquiredMutexLock = acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, mutexRowKey); - if(acquiredMutexLock) { - logger.debug("Acquired lock in SYSMUTEX table for migrating SYSTEM tables to SYSTEM namespace"); - } - // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException - // Handle the upgrade of SYSMUTEX table separately since it doesn't have any entries in SYSCAT logger.info("Migrating SYSTEM.MUTEX table to SYSTEM namespace."); String sysMutexSrcTableName = PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME; - String sysMutexDestTableName = SchemaUtil.getPhysicalName(sysMutexSrcTableName.getBytes(), props).getNameAsString(); + String sysMutexDestTableName = SchemaUtil.getPhysicalName(sysMutexSrcTableName.getBytes(), this.getProps()).getNameAsString(); UpgradeUtil.mapTableToNamespace(admin, sysMutexSrcTableName, sysMutexDestTableName, PTableType.SYSTEM); tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME); byte[] mappedSystemTable = SchemaUtil - .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName(); + .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName(); metatable = getTable(mappedSystemTable); if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)) { if (!admin.tableExists(mappedSystemTable)) { logger.info("Migrating SYSTEM.CATALOG table to SYSTEM namespace."); // Actual migration of SYSCAT table UpgradeUtil.mapTableToNamespace(admin, metatable, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, this.getProps(), null, PTableType.SYSTEM, null); // Invalidate the client-side metadataCache ConnectionQueryServicesImpl.this.removeTable(null, @@ -3285,7 +3376,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } for (TableName table : tableNames) { logger.info(String.format("Migrating %s table to SYSTEM namespace.", table.getNameAsString())); - UpgradeUtil.mapTableToNamespace(admin, metatable, table.getNameAsString(), props, null, PTableType.SYSTEM, + UpgradeUtil.mapTableToNamespace(admin, metatable, table.getNameAsString(), this.getProps(), null, PTableType.SYSTEM, null); ConnectionQueryServicesImpl.this.removeTable(null, table.getNameAsString(), null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0); @@ -3297,9 +3388,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (metatable != null) { metatable.close(); } - if(acquiredMutexLock) { - releaseUpgradeMutex(mutexRowKey); - } } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/87fdda8b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 41df7b0..1c56c31 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -242,7 +242,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple @Override public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] physicalName, PTableType tableType, Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, - boolean isNamespaceMapped, boolean allocateIndexId) throws SQLException { + boolean isNamespaceMapped, boolean allocateIndexId, boolean isDoNotUpgradePropSet) throws SQLException { if (tableType == PTableType.INDEX && IndexUtil.isLocalIndexFamily(Bytes.toString(families.iterator().next().getFirst()))) { Object dataTableName = tableProps.get(PhoenixDatabaseMetaData.DATA_TABLE_NAME); List<HRegionLocation> regionLocations = tableSplits.get(dataTableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/87fdda8b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index e393210..cb7ce58 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -114,9 +114,9 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple @Override public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] physicalName, PTableType tableType, Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, - boolean isNamespaceMapped, boolean allocateIndexId) throws SQLException { + boolean isNamespaceMapped, boolean allocateIndexId, boolean isDoNotUpgradePropSet) throws SQLException { return getDelegate().createTable(tableMetaData, physicalName, tableType, tableProps, families, splits, - isNamespaceMapped, allocateIndexId); + isNamespaceMapped, allocateIndexId, isDoNotUpgradePropSet); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/87fdda8b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 1fb668e..b15072a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -2694,7 +2694,8 @@ public class MetaDataClient { MetaDataMutationResult result = connection.getQueryServices().createTable( tableMetaData, viewType == ViewType.MAPPED || allocateIndexId ? physicalNames.get(0).getBytes() : null, - tableType, tableProps, familyPropList, splits, isNamespaceMapped, allocateIndexId); + tableType, tableProps, familyPropList, splits, isNamespaceMapped, allocateIndexId, + UpgradeUtil.isNoUpgradeSet(connection.getClientInfo())); MutationCode code = result.getMutationCode(); switch(code) { case TABLE_ALREADY_EXISTS: http://git-wip-us.apache.org/repos/asf/phoenix/blob/87fdda8b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java index b5c3e4a..46907d9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java @@ -47,9 +47,9 @@ public class ConnectionQueryServicesImplTest { ConnectionQueryServicesImpl cqs = mock(ConnectionQueryServicesImpl.class); // Invoke the real methods for these two calls when(cqs.createSchema(any(List.class), anyString())).thenCallRealMethod(); - doCallRealMethod().when(cqs).ensureSystemTablesMigratedToSystemNamespace(any(ReadOnlyProps.class)); + doCallRealMethod().when(cqs).ensureSystemTablesMigratedToSystemNamespace(); // Do nothing for this method, just check that it was invoked later - doNothing().when(cqs).createSysMutexTableIfNotExists(any(HBaseAdmin.class), any(ReadOnlyProps.class)); + doNothing().when(cqs).createSysMutexTableIfNotExists(any(HBaseAdmin.class)); // Spoof out this call so that ensureSystemTablesUpgrade() will return-fast. when(cqs.getSystemTableNamesInDefaultNamespace(any(HBaseAdmin.class))).thenReturn(Collections.<TableName> emptyList()); @@ -60,7 +60,8 @@ public class ConnectionQueryServicesImplTest { // Make sure that ensureSystemTablesMigratedToSystemNamespace will try to migrate the system tables. Map<String,String> props = new HashMap<>(); props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); - cqs.ensureSystemTablesMigratedToSystemNamespace(new ReadOnlyProps(props)); + when(cqs.getProps()).thenReturn(new ReadOnlyProps(props)); + cqs.ensureSystemTablesMigratedToSystemNamespace(); // Should be called after upgradeSystemTables() // Proves that execution proceeded http://git-wip-us.apache.org/repos/asf/phoenix/blob/87fdda8b/phoenix-protocol/src/main/MetaDataService.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto index 2ba2b4c..369522c 100644 --- a/phoenix-protocol/src/main/MetaDataService.proto +++ b/phoenix-protocol/src/main/MetaDataService.proto @@ -168,6 +168,7 @@ message GetVersionRequest { message GetVersionResponse { required int64 version = 1; + optional int64 systemCatalogTimestamp = 2; } message ClearTableFromCacheRequest {