PHOENIX-1641 Make the upgrade from 4.x to 4.3 work for SYSTEM.CATALOG and SYSTEM.SEQUENCE
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/eaa7fbfd Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/eaa7fbfd Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/eaa7fbfd Branch: refs/heads/calcite Commit: eaa7fbfde9f45c635cc11d524977d54485c9ea8d Parents: 9db37bd Author: Samarth <samarth.j...@salesforce.com> Authored: Fri Feb 6 16:29:09 2015 -0800 Committer: Samarth <samarth.j...@salesforce.com> Committed: Fri Feb 6 16:29:09 2015 -0800 ---------------------------------------------------------------------- .../query/ConnectionQueryServicesImpl.java | 69 +++++-- .../org/apache/phoenix/util/UpgradeUtil.java | 179 ++++++++++--------- 2 files changed, 146 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/eaa7fbfd/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 6d58f57..7763a0a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -136,6 +136,7 @@ import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PUnsignedTinyint; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ConfigUtil; @@ -1757,8 +1758,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } - // Keeping this to use for further upgrades - protected PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection, + /** + * Keeping this to use for further upgrades. This method closes the oldMetaConnection. + */ + private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns) throws SQLException { Properties props = new Properties(oldMetaConnection.getClientInfo()); @@ -1826,7 +1829,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (NewerTableAlreadyExistsException ignore) { // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp. // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. - } catch (TableAlreadyExistsException ignore) { + } catch (TableAlreadyExistsException e) { + // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include + // any new columns we've added. + long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); + + // We know that we always need to add the STORE_NULLS column for 4.3 release + String columnsToAdd = PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName(); + + // If the server side schema is 4 versions behind then we need to add INDEX_TYPE + // and INDEX_DISABLE_TIMESTAMP columns too. + // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, + // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all + // the column names that have been added to SYSTEM.CATALOG since 4.0. + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 4) { + columnsToAdd += ", " + PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName() + + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName(); + } + + // Ugh..need to assign to another local variable to keep eclipse happy. + PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd); + metaConnection = newMetaConnection; } int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); @@ -1840,20 +1865,34 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement Integer sequenceSaltBuckets = e.getTable().getBucketNum(); nSequenceSaltBuckets = sequenceSaltBuckets == null ? 0 : sequenceSaltBuckets; } catch (TableAlreadyExistsException e) { - // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include + // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to include // any new columns we've added. - if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) { - metaConnection.removeTable(null, - PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME, - PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); - clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, - PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES, - PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); - clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); + long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); + // if the table is at a timestamp corresponding to before 4.2.1 then run the upgrade script + if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 2) { + if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) { + metaConnection.removeTable(null, + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME, + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); + clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES, + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); + clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); + } + nSequenceSaltBuckets = nSaltBuckets; + } + if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 3) { + // If the table time stamp is before 4.1.0 then we need to add below columns + // to the SYSTEM.SEQUENCE table. + String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName() + + ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName() + + ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName() + + ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName(); + addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd); } - nSequenceSaltBuckets = nSaltBuckets; } try { metaConnection.createStatement().executeUpdate( http://git-wip-us.apache.org/repos/asf/phoenix/blob/eaa7fbfd/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index a3fee72..a92223b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -242,106 +242,110 @@ public class UpgradeUtil { logger.info("SYSTEM.SEQUENCE table has already been upgraded"); return false; } + + // if the SYSTEM.SEQUENCE table is for 4.1.0 or before then we need to salt the table + if (oldTable.getTimeStamp() <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP - 3) { + int batchSizeBytes = 100 * 1024; // 100K chunks + int sizeBytes = 0; + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000); - int batchSizeBytes = 100 * 1024; // 100K chunks - int sizeBytes = 0; - List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000); - - boolean success = false; - Scan scan = new Scan(); - scan.setRaw(true); - scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS); - HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); - try { - boolean committed = false; - logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows"); - ResultScanner scanner = seqTable.getScanner(scan); + boolean success = false; + Scan scan = new Scan(); + scan.setRaw(true); + scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS); + HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); try { - Result result; - while ((result = scanner.next()) != null) { - for (KeyValue keyValue : result.raw()) { - KeyValue newKeyValue = addSaltByte(keyValue, nSaltBuckets); - if (newKeyValue != null) { - sizeBytes += newKeyValue.getLength(); - if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) { - // Delete old value - byte[] buf = keyValue.getBuffer(); - Delete delete = new Delete(keyValue.getRow()); - KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(), - buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(), - buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(), - keyValue.getTimestamp(), KeyValue.Type.Delete, - ByteUtil.EMPTY_BYTE_ARRAY,0,0); - delete.addDeleteMarker(deleteKeyValue); - mutations.add(delete); - sizeBytes += deleteKeyValue.getLength(); - // Put new value - Put put = new Put(newKeyValue.getRow()); - put.add(newKeyValue); - mutations.add(put); - } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){ - // Copy delete marker using new key so that it continues - // to delete the key value preceding it that will be updated - // as well. - Delete delete = new Delete(newKeyValue.getRow()); - delete.addDeleteMarker(newKeyValue); - mutations.add(delete); + boolean committed = false; + logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows"); + ResultScanner scanner = seqTable.getScanner(scan); + try { + Result result; + while ((result = scanner.next()) != null) { + for (KeyValue keyValue : result.raw()) { + KeyValue newKeyValue = addSaltByte(keyValue, nSaltBuckets); + if (newKeyValue != null) { + sizeBytes += newKeyValue.getLength(); + if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) { + // Delete old value + byte[] buf = keyValue.getBuffer(); + Delete delete = new Delete(keyValue.getRow()); + KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(), + buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(), + buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(), + keyValue.getTimestamp(), KeyValue.Type.Delete, + ByteUtil.EMPTY_BYTE_ARRAY,0,0); + delete.addDeleteMarker(deleteKeyValue); + mutations.add(delete); + sizeBytes += deleteKeyValue.getLength(); + // Put new value + Put put = new Put(newKeyValue.getRow()); + put.add(newKeyValue); + mutations.add(put); + } else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){ + // Copy delete marker using new key so that it continues + // to delete the key value preceding it that will be updated + // as well. + Delete delete = new Delete(newKeyValue.getRow()); + delete.addDeleteMarker(newKeyValue); + mutations.add(delete); + } + } + if (sizeBytes >= batchSizeBytes) { + logger.info("Committing bactch of SYSTEM.SEQUENCE rows"); + seqTable.batch(mutations); + mutations.clear(); + sizeBytes = 0; + committed = true; } - } - if (sizeBytes >= batchSizeBytes) { - logger.info("Committing bactch of SYSTEM.SEQUENCE rows"); - seqTable.batch(mutations); - mutations.clear(); - sizeBytes = 0; - committed = true; } } - } - if (!mutations.isEmpty()) { - logger.info("Committing last bactch of SYSTEM.SEQUENCE rows"); - seqTable.batch(mutations); - } - preSplitSequenceTable(conn, nSaltBuckets); - logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE"); - success = true; - return true; - } catch (InterruptedException e) { - throw ServerUtil.parseServerException(e); - } finally { - try { - scanner.close(); + if (!mutations.isEmpty()) { + logger.info("Committing last bactch of SYSTEM.SEQUENCE rows"); + seqTable.batch(mutations); + } + preSplitSequenceTable(conn, nSaltBuckets); + logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE"); + success = true; + return true; + } catch (InterruptedException e) { + throw ServerUtil.parseServerException(e); } finally { - if (!success) { - if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything - // Don't use Delete here as we'd never be able to change it again at this timestamp. - KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey, - PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, - PInteger.INSTANCE.toBytes(0)); - Put unsaltPut = new Put(seqTableKey); - unsaltPut.add(unsaltKV); - try { - sysTable.put(unsaltPut); - success = true; - } finally { - if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE"); + try { + scanner.close(); + } finally { + if (!success) { + if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything + // Don't use Delete here as we'd never be able to change it again at this timestamp. + KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey, + PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, + PInteger.INSTANCE.toBytes(0)); + Put unsaltPut = new Put(seqTableKey); + unsaltPut.add(unsaltKV); + try { + sysTable.put(unsaltPut); + success = true; + } finally { + if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE"); + } + } else { // We're screwed b/c we've already committed some salted sequences... + logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE"); } - } else { // We're screwed b/c we've already committed some salted sequences... - logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE"); } } } - } - } catch (IOException e) { - throw ServerUtil.parseServerException(e); - } finally { - try { - seqTable.close(); } catch (IOException e) { - logger.warn("Exception during close",e); + throw ServerUtil.parseServerException(e); + } finally { + try { + seqTable.close(); + } catch (IOException e) { + logger.warn("Exception during close",e); + } } } + return false; } catch (IOException e) { throw ServerUtil.parseServerException(e); } finally { @@ -351,6 +355,7 @@ public class UpgradeUtil { logger.warn("Exception during close",e); } } + } @SuppressWarnings("deprecation")