PHOENIX-3174 Make minor upgrade a manual step
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e90feaa3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e90feaa3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e90feaa3 Branch: refs/heads/calcite Commit: e90feaa31bd928645ecc3596437b822d980939ed Parents: e8ffc9c Author: Samarth <samarth.j...@salesforce.com> Authored: Mon Sep 19 18:31:49 2016 -0700 Committer: Samarth <samarth.j...@salesforce.com> Committed: Mon Sep 19 18:31:49 2016 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/UpgradeIT.java | 156 ++- phoenix-core/src/main/antlr3/PhoenixSQL.g | 10 +- .../coprocessor/MetaDataEndpointImpl.java | 16 +- .../coprocessor/MetaDataRegionObserver.java | 47 +- .../phoenix/exception/SQLExceptionCode.java | 2 + .../exception/UpgradeInProgressException.java | 28 + .../exception/UpgradeNotRequiredException.java | 27 + .../exception/UpgradeRequiredException.java | 29 + .../apache/phoenix/jdbc/PhoenixConnection.java | 27 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 72 +- .../index/automation/PhoenixMRJobSubmitter.java | 4 + .../phoenix/parse/ExecuteUpgradeStatement.java | 34 + .../apache/phoenix/parse/ParseNodeFactory.java | 4 + .../phoenix/query/ConnectionQueryServices.java | 3 + .../query/ConnectionQueryServicesImpl.java | 951 ++++++++++--------- .../query/ConnectionlessQueryServicesImpl.java | 8 + .../query/DelegateConnectionQueryServices.java | 10 + .../org/apache/phoenix/query/QueryServices.java | 1 + .../phoenix/query/QueryServicesOptions.java | 5 +- .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../phoenix/trace/PhoenixMetricsSink.java | 37 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 2 +- .../java/org/apache/phoenix/util/QueryUtil.java | 6 +- .../org/apache/phoenix/util/UpgradeUtil.java | 19 +- 25 files changed, 1027 insertions(+), 475 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java index b47738d..bdd94a2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java @@ -24,7 +24,9 @@ import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_ import static org.apache.phoenix.util.UpgradeUtil.SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.sql.Connection; @@ -33,6 +35,9 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -41,8 +46,14 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.exception.UpgradeInProgressException; +import org.apache.phoenix.exception.UpgradeRequiredException; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.ConnectionQueryServicesImpl; +import org.apache.phoenix.query.DelegateConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PName; @@ -51,7 +62,6 @@ import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.UpgradeUtil; -import org.junit.Ignore; import org.junit.Test; public class UpgradeIT extends BaseHBaseManagedTimeIT { @@ -576,6 +586,150 @@ public class UpgradeIT extends BaseHBaseManagedTimeIT { } } } + + @Test + public void testUpgradeRequiredPreventsSQL() throws SQLException { + String tableName = generateRandomString(); + try (Connection conn = getConnection(false, null)) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + + " (PK1 VARCHAR NOT NULL, PK2 VARCHAR, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2))"); + final ConnectionQueryServices delegate = conn.unwrap(PhoenixConnection.class).getQueryServices(); + ConnectionQueryServices servicesWithUpgrade = new DelegateConnectionQueryServices(delegate) { + @Override + public boolean isUpgradeRequired() { + return true; + } + }; + try (PhoenixConnection phxConn = new PhoenixConnection(servicesWithUpgrade, + conn.unwrap(PhoenixConnection.class), HConstants.LATEST_TIMESTAMP)) { + try { + phxConn.createStatement().execute( + "CREATE TABLE " + generateRandomString() + + " (k1 VARCHAR NOT NULL, k2 VARCHAR, CONSTRAINT PK PRIMARY KEY(K1,K2))"); + fail("CREATE TABLE should have failed with UpgradeRequiredException"); + } catch (UpgradeRequiredException expected) { + + } + try { + phxConn.createStatement().execute("SELECT * FROM " + tableName); + fail("SELECT should have failed with UpgradeRequiredException"); + } catch (UpgradeRequiredException expected) { + + } + try { + phxConn.createStatement().execute("DELETE FROM " + tableName); + fail("DELETE should have failed with UpgradeRequiredException"); + } catch (UpgradeRequiredException expected) { + + } + try { + phxConn.createStatement().execute( + "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (KV1) INCLUDE (KV2)" ); + fail("CREATE INDEX should have failed with UpgradeRequiredException"); + } catch (UpgradeRequiredException expected) { + + } + try { + phxConn.createStatement().execute( + "UPSERT INTO " + tableName + " VALUES ('PK1', 'PK2', 'KV1', 'KV2')" ); + fail("UPSERT VALUES should have failed with UpgradeRequiredException"); + } catch (UpgradeRequiredException expected) { + + } + } + } + } + + @Test + public void testUpgradingConnectionBypassesUpgradeRequiredCheck() throws Exception { + String tableName = generateRandomString(); + try (Connection conn = getConnection(false, null)) { + conn.createStatement() + .execute( + "CREATE TABLE " + + tableName + + " (PK1 VARCHAR NOT NULL, PK2 VARCHAR, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2))"); + final ConnectionQueryServices delegate = conn.unwrap(PhoenixConnection.class).getQueryServices(); + ConnectionQueryServices servicesWithUpgrade = new DelegateConnectionQueryServices(delegate) { + @Override + public boolean isUpgradeRequired() { + return true; + } + }; + try (PhoenixConnection phxConn = new PhoenixConnection(servicesWithUpgrade, + conn.unwrap(PhoenixConnection.class), HConstants.LATEST_TIMESTAMP)) { + // Because upgrade is required, this SQL should fail. + try { + phxConn.createStatement().executeQuery("SELECT * FROM " + tableName); + fail("SELECT should have failed with UpgradeRequiredException"); + } catch (UpgradeRequiredException expected) { + + } + // Marking connection as the one running upgrade should let SQL execute fine. + phxConn.setRunningUpgrade(true); + phxConn.createStatement().execute( + "UPSERT INTO " + tableName + " VALUES ('PK1', 'PK2', 'KV1', 'KV2')" ); + phxConn.commit(); + try (ResultSet rs = phxConn.createStatement().executeQuery("SELECT * FROM " + tableName)) { + assertTrue(rs.next()); + assertFalse(rs.next()); + } + } + } + } + + @Test + public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Exception { + final AtomicBoolean mutexStatus1 = new AtomicBoolean(false); + final AtomicBoolean mutexStatus2 = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(2); + final AtomicInteger numExceptions = new AtomicInteger(0); + try (Connection conn = getConnection(false, null)) { + final ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + Thread t1 = new Thread(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions)); + t1.setDaemon(true); + Thread t2 = new Thread(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions)); + t2.setDaemon(true);; + t1.start(); + t2.start(); + latch.await(); + assertTrue("One of the threads should have acquired the mutex", mutexStatus1.get() || mutexStatus2.get()); + assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(), mutexStatus2.get()); + assertEquals("One and only one thread should have caught UpgradeRequiredException ", 1, numExceptions.get()); + } + } + + private static class AcquireMutexRunnable implements Runnable { + + private final AtomicBoolean acquireStatus; + private final ConnectionQueryServices services; + private final CountDownLatch latch; + private final AtomicInteger numExceptions; + public AcquireMutexRunnable(AtomicBoolean acquireStatus, ConnectionQueryServices services, CountDownLatch latch, AtomicInteger numExceptions) { + this.acquireStatus = acquireStatus; + this.services = services; + this.latch = latch; + this.numExceptions = numExceptions; + } + @Override + public void run() { + try { + ((ConnectionQueryServicesImpl)services).acquireUpgradeMutex( + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + acquireStatus.set(true); + } catch (UpgradeInProgressException e) { + numExceptions.incrementAndGet(); + } + catch (IOException | SQLException ignore) { + + } finally { + latch.countDown(); + } + } + + } private Connection createTenantConnection(String tenantId) throws SQLException { Properties props = new Properties(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index bc48b19..7e81a3a 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -133,6 +133,8 @@ tokens ROW = 'row'; ROWS = 'rows'; ONLY = 'only'; + EXECUTE = 'execute'; + UPGRADE = 'upgrade'; } @@ -419,6 +421,7 @@ oneStatement returns [BindableStatement ret] | s=drop_schema_node | s=use_schema_node | s=update_statistics_node + | s=execute_upgrade_node | s=explain_node) { $ret = s; } ; finally{ contextStack.pop(); } @@ -570,7 +573,7 @@ trace_node returns [TraceStatement ret] {ret = factory.trace(Tracing.isTraceOn(flag.getText()), s == null ? Tracing.isTraceOn(flag.getText()) ? 1.0 : 0.0 : (((BigDecimal)s.getValue())).doubleValue());} ; -// Parse a trace statement. +// Parse a create function statement. create_function_node returns [CreateFunctionStatement ret] : CREATE (OR replace=REPLACE)? (temp=TEMPORARY)? FUNCTION function=identifier (LPAREN args=zero_or_more_data_types RPAREN) @@ -619,6 +622,11 @@ update_statistics_node returns [UpdateStatisticsStatement ret] {ret = factory.updateStatistics(factory.namedTable(null, t), s == null ? StatisticsCollectionScope.getDefault() : StatisticsCollectionScope.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), p);} ; +execute_upgrade_node returns [ExecuteUpgradeStatement ret] + : EXECUTE UPGRADE + {ret = factory.executeUpgrade();} + ; + prop_name returns [String ret] : p=identifier {$ret = SchemaUtil.normalizeIdentifier(p); } ; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index a5d77bc..142f700 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -79,7 +79,6 @@ import static org.apache.phoenix.util.SchemaUtil.getVarChars; import java.io.IOException; import java.sql.DriverManager; -import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; @@ -127,6 +126,7 @@ import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.WhereCompiler; @@ -162,8 +162,10 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.metrics.Metrics; import org.apache.phoenix.parse.LiteralParseNode; @@ -212,7 +214,6 @@ import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.MetaDataUtil; -import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; @@ -1433,12 +1434,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (parentTable!=null && parentTable.getAutoPartitionSeqName()!=null) { long autoPartitionNum = 1; final Properties props = new Properties(); - props.setProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB, Boolean.TRUE.toString()); + UpgradeUtil.doNotUpgradeOnFirstConnection(props); try (PhoenixConnection connection = DriverManager.getConnection(MetaDataUtil.getJdbcUrl(env), props).unwrap(PhoenixConnection.class); - Statement stmt = connection.createStatement()) { + Statement stmt = connection.createStatement()) { String seqName = parentTable.getAutoPartitionSeqName(); + // Not going through the standard route of using statement.execute() as that code path + // is blocked if the metadata hasn't been been upgraded to the new minor release. String seqNextValueSql = String.format("SELECT NEXT VALUE FOR %s", seqName); - ResultSet rs = stmt.executeQuery(seqNextValueSql); + PhoenixStatement ps = stmt.unwrap(PhoenixStatement.class); + QueryPlan plan = ps.compileQuery(seqNextValueSql); + ResultIterator resultIterator = plan.iterator(); + PhoenixResultSet rs = ps.newResultSet(resultIterator, plan.getProjector(), plan.getContext()); rs.next(); autoPartitionNum = rs.getLong(1); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 00981f5..c645cf4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -18,8 +18,10 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; +import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -54,6 +56,10 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; @@ -68,6 +74,7 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; @@ -80,6 +87,7 @@ import org.apache.phoenix.util.UpgradeUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.protobuf.ServiceException; /** @@ -297,11 +305,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // Allow index to begin incremental maintenance as index is back online and we // cannot transition directly from DISABLED -> ACTIVE if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) { - AlterIndexStatement statement = new AlterIndexStatement( - NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()), - dataPTable.getTableName().getString(), - false, PIndexState.INACTIVE); - client.alterIndex(statement); + updateIndexState(indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE); } List<PTable> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable); if (indexesToPartiallyRebuild == null) { @@ -393,11 +397,9 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } } for (PTable indexPTable : indexesToPartiallyRebuild) { - AlterIndexStatement statement = new AlterIndexStatement( - NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable - .getTableName().getString()), dataPTable.getTableName().getString(), - false, PIndexState.ACTIVE); - client.alterIndex(statement); + String indexTableFullName = SchemaUtil.getTableName(indexPTable.getSchemaName() + .getString(), indexPTable.getTableName().getString()); + updateIndexState(indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE); } } catch (Exception e) { // Log, but try next table's indexes LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild @@ -426,4 +428,29 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } } } + + private static void updateIndexState(String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState, + PIndexState newState) throws ServiceException, Throwable { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); + String indexName = SchemaUtil.getSchemaNameFromFullName(indexTableName); + // Mimic the Put that gets generated by the client on an update of the index state + Put put = new Put(indexTableKey); + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + newState.getSerializedBytes()); + if (newState == PIndexState.ACTIVE) { + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, + PLong.INSTANCE.toBytes(0)); + } + final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); + Connection conn = QueryUtil.getConnection(env.getConfiguration()); + MetaDataMutationResult result = conn.unwrap(PhoenixConnection.class).getQueryServices() + .updateIndexState(tableMetadata, null); + MutationCode code = result.getMutationCode(); + if (code == MutationCode.TABLE_NOT_FOUND) { throw new TableNotFoundException(schemaName, indexName); } + if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION) + .setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName) + .setTableName(indexName).build().buildException(); } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 0ccecae..e6a26ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -371,6 +371,8 @@ public enum SQLExceptionCode { INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index metadata. "), UNKNOWN_ERROR_CODE(2009, "INT11", "Unknown error code."), CONCURRENT_UPGRADE_IN_PROGRESS(2010, "INT12", ""), + UPGRADE_REQUIRED(2011, "INT13", ""), + UPGRADE_NOT_REQUIRED(2012, "INT14", ""), OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out.", new Factory() { @Override public SQLException newException(SQLExceptionInfo info) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java new file mode 100644 index 0000000..5b15216 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.exception; + +import java.sql.SQLException; + +public class UpgradeInProgressException extends SQLException { + public UpgradeInProgressException(String upgradeFrom, String upgradeTo) { + super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo + + ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS + .getSQLState(), SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeNotRequiredException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeNotRequiredException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeNotRequiredException.java new file mode 100644 index 0000000..0490319 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeNotRequiredException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.exception; + +import java.sql.SQLException; + +public class UpgradeNotRequiredException extends SQLException { + public UpgradeNotRequiredException() { + super("Operation not allowed since cluster has already been upgraded. ", SQLExceptionCode.UPGRADE_NOT_REQUIRED + .getSQLState(), SQLExceptionCode.UPGRADE_NOT_REQUIRED.getErrorCode()); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeRequiredException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeRequiredException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeRequiredException.java new file mode 100644 index 0000000..005a1bd --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeRequiredException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.exception; + +import java.sql.SQLException; + +public class UpgradeRequiredException extends SQLException { + + public UpgradeRequiredException() { + super("Operation not allowed since cluster hasn't been upgraded. Call EXECUTE UPGRADE. ", + SQLExceptionCode.UPGRADE_REQUIRED.getSQLState(), SQLExceptionCode.UPGRADE_REQUIRED.getErrorCode()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 7bd4b9a..cef77d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -164,6 +164,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private ParallelIteratorFactory parallelIteratorFactory; private final LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue; private TableResultIteratorFactory tableResultIteratorFactory; + private boolean isRunningUpgrade; static { Tracing.addTraceMetricsSource(); @@ -175,8 +176,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea return props; } - public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade); + public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade, boolean isRunningUpgrade) throws SQLException { + this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade, isRunningUpgrade); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -184,11 +185,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(PhoenixConnection connection) throws SQLException { - this(connection, connection.isDescVarLengthRowKeyUpgrade()); + this(connection, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); } public PhoenixConnection(PhoenixConnection connection, MutationState mutationState) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade()); + this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); } public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { @@ -196,7 +197,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { - this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade()); + this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -204,14 +205,14 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { - this(services, url, info, metaData, null, false); + this(services, url, info, metaData, null, false, false); } public PhoenixConnection(PhoenixConnection connection, ConnectionQueryServices services, Properties info) throws SQLException { - this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade()); + this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); } - public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade) throws SQLException { + public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade) throws SQLException { this.url = url; this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; // Copy so client cannot change @@ -300,6 +301,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.customTracingAnnotations = getImmutableCustomTracingAnnotations(); this.scannerQueue = new LinkedBlockingQueue<>(); this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory(); + this.isRunningUpgrade = isRunningUpgrade; GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); } @@ -1069,4 +1071,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea getQueryServices().removeSchema(schema, schemaTimeStamp); } + + public boolean isRunningUpgrade() { + return isRunningUpgrade; + } + + public void setRunningUpgrade(boolean isRunningUpgrade) { + this.isRunningUpgrade = isRunningUpgrade; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 2fed40c..47dfd4e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -515,7 +515,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public PhoenixStatement newStatement(PhoenixConnection connection) { return new PhoenixStatement(connection) { @Override - protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector, + public PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector, StatementContext context) throws SQLException { return new PhoenixResultSet(new TenantColumnFilteringIterator(iterator, projector), projector, context); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 0fbc81a..60e32e5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -78,6 +79,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.exception.BatchUpdateExecution; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.exception.UpgradeRequiredException; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.iterate.MaterializedResultIterator; @@ -104,6 +106,7 @@ import org.apache.phoenix.parse.DropIndexStatement; import org.apache.phoenix.parse.DropSchemaStatement; import org.apache.phoenix.parse.DropSequenceStatement; import org.apache.phoenix.parse.DropTableStatement; +import org.apache.phoenix.parse.ExecuteUpgradeStatement; import org.apache.phoenix.parse.ExplainStatement; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode; @@ -192,7 +195,8 @@ public class PhoenixStatement implements Statement, SQLCloseable { public enum Operation { QUERY("queried", false), DELETE("deleted", true), - UPSERT("upserted", true); + UPSERT("upserted", true), + UPGRADE("upgrade", true); private final String toString; private final boolean isMutation; @@ -241,7 +245,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { return resultSets; } - protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector, StatementContext context) throws SQLException { + public PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector, StatementContext context) throws SQLException { return new PhoenixResultSet(iterator, projector, context); } @@ -268,7 +272,12 @@ public class PhoenixStatement implements Statement, SQLCloseable { public PhoenixResultSet call() throws SQLException { final long startTime = System.currentTimeMillis(); try { - QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); + PhoenixConnection conn = getConnection(); + if (conn.getQueryServices().isUpgradeRequired() && !conn.isRunningUpgrade() + && stmt.getOperation() != Operation.UPGRADE) { + throw new UpgradeRequiredException(); + } + QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); // Send mutations to hbase, so they are visible to subsequent reads. // Use original plan for data table so that data and immutable indexes will be sent // TODO: for joins, we need to iterate through all tables, but we need the original table, @@ -333,6 +342,11 @@ public class PhoenixStatement implements Statement, SQLCloseable { @Override public Integer call() throws SQLException { try { + PhoenixConnection conn = getConnection(); + if (conn.getQueryServices().isUpgradeRequired() && !conn.isRunningUpgrade() + && stmt.getOperation() != Operation.UPGRADE) { + throw new UpgradeRequiredException(); + } MutationState state = connection.getMutationState(); MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null && plan.getTargetRef().getTable().isTransactional()) { @@ -1079,6 +1093,53 @@ public class PhoenixStatement implements Statement, SQLCloseable { } } + + private static class ExecutableExecuteUpgradeStatement extends ExecuteUpgradeStatement implements CompilableStatement { + @SuppressWarnings("unchecked") + @Override + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + return new MutationPlan() { + + @Override + public Set<TableRef> getSourceRefs() { + return Collections.emptySet(); + } + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public Operation getOperation() { + return Operation.UPGRADE; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("EXECUTE UPGRADE")); + } + + @Override + public StatementContext getContext() { + return new StatementContext(stmt); + } + + @Override + public TableRef getTargetRef() { + return TableRef.EMPTY_TABLE_REF; + } + + @Override + public MutationState execute() throws SQLException { + PhoenixConnection phxConn = stmt.getConnection(); + Properties props = new Properties(); + phxConn.getQueryServices().upgradeSystemTables(phxConn.getURL(), props); + return MutationState.emptyMutationState(-1, phxConn); + } + }; + } + } private static class ExecutableAddColumnStatement extends AddColumnStatement implements CompilableStatement { @@ -1261,6 +1322,11 @@ public class PhoenixStatement implements Statement, SQLCloseable { public UpdateStatisticsStatement updateStatistics(NamedTableNode table, StatisticsCollectionScope scope, Map<String,Object> props) { return new ExecutableUpdateStatisticsStatement(table, scope, props); } + + @Override + public ExecuteUpgradeStatement executeUpgrade() { + return new ExecutableExecuteUpgradeStatement(); + } } static class PhoenixStatementParser extends SQLParser { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java index f6f2482..9c447e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -52,6 +53,7 @@ import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.types.PDate; import org.apache.phoenix.util.PhoenixMRJobUtil; +import org.apache.phoenix.util.UpgradeUtil; import org.apache.phoenix.util.PhoenixMRJobUtil.MR_SCHEDULER_TYPE; import org.apache.phoenix.util.ZKBasedMasterElectionUtil; import org.codehaus.jettison.json.JSONArray; @@ -158,6 +160,8 @@ public class PhoenixMRJobSubmitter { } public Map<String, PhoenixAsyncIndex> getCandidateJobs() throws SQLException { + Properties props = new Properties(); + UpgradeUtil.doNotUpgradeOnFirstConnection(props); Connection con = DriverManager.getConnection("jdbc:phoenix:" + zkQuorum); Statement s = con.createStatement(); ResultSet rs = s.executeQuery(CANDIDATE_INDEX_INFO_QUERY); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/parse/ExecuteUpgradeStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ExecuteUpgradeStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ExecuteUpgradeStatement.java new file mode 100644 index 0000000..29edf8f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ExecuteUpgradeStatement.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; + +public class ExecuteUpgradeStatement implements BindableStatement { + + @Override + public int getBindCount() { + return 0; + } + + @Override + public Operation getOperation() { + return Operation.UPGRADE; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 024e2c7..332ff15 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -393,6 +393,10 @@ public class ParseNodeFactory { public UpdateStatisticsStatement updateStatistics(NamedTableNode table, StatisticsCollectionScope scope, Map<String,Object> props) { return new UpdateStatisticsStatement(table, scope, props); } + + public ExecuteUpgradeStatement executeUpgrade() { + return new ExecuteUpgradeStatement(); + } public FunctionParseNode functionDistinct(String name, List<ParseNode> args) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 7154d58..6f8b528 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -139,4 +139,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated * @param tableName The table to remove stats for */ void invalidateStats(ImmutableBytesPtr tableName); + + boolean isUpgradeRequired(); + void upgradeSystemTables(String url, Properties props) throws SQLException; } \ No newline at end of file