This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.x-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push: new 0e01a15 PHOENIX-5073 modify index state based on client version to support old clients 0e01a15 is described below commit 0e01a15acf02a2679a90d7f14870719480aa9adf Author: kiran.maturi <maturi.ki...@gmail.com> AuthorDate: Fri Dec 28 14:46:19 2018 +0530 PHOENIX-5073 modify index state based on client version to support old clients --- .../index/InvalidIndexStateClientSideIT.java | 145 +++++++++++++++++++++ .../phoenix/coprocessor/MetaDataEndpointImpl.java | 41 ++++-- 2 files changed, 175 insertions(+), 11 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/InvalidIndexStateClientSideIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/InvalidIndexStateClientSideIT.java new file mode 100644 index 0000000..7052ade --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/InvalidIndexStateClientSideIT.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Map; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.Test; + +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; + +public class InvalidIndexStateClientSideIT extends ParallelStatsDisabledIT { + private static final Log LOG = LogFactory.getLog(InvalidIndexStateClientSideIT.class); + + @Test + public void testCachedConnections() throws Throwable { + final String schemaName = generateUniqueName(); + final String tableName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String indexName = generateUniqueName(); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + final Connection conn = DriverManager.getConnection(getUrl()); + + // create table and indices + String createTableSql = + "CREATE TABLE " + fullTableName + + "(org_id VARCHAR NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER, v3 INTEGER)"; + conn.createStatement().execute(createTableSql); + conn.createStatement() + .execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)"); + conn.commit(); + PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class); + ConnectionQueryServices queryServices = phoenixConn.getQueryServices(); + HTableInterface metaTable = + phoenixConn.getQueryServices() + .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + long ts = EnvironmentEdgeManager.currentTimeMillis(); + MutationCode code = + IndexUtil + .updateIndexState(fullIndexName, ts, metaTable, PIndexState.PENDING_DISABLE) + .getMutationCode(); + assertEquals(MutationCode.TABLE_ALREADY_EXISTS, code); + ts = EnvironmentEdgeManager.currentTimeMillis(); + + final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName); + final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName); + PName tenantId = phoenixConn.getTenantId(); + final long tableTimestamp = HConstants.LATEST_TIMESTAMP; + long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP; + final long resolvedTimestamp = tableResolvedTimestamp; + final byte[] tenantIdBytes = + tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(); + byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes); + Batch.Call<MetaDataService, MetaDataResponse> callable = + new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + GetTableRequest.Builder builder = GetTableRequest.newBuilder(); + builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); + builder.setSchemaName(ByteStringer.wrap(schemaBytes)); + builder.setTableName(ByteStringer.wrap(tableBytes)); + builder.setTableTimestamp(tableTimestamp); + builder.setClientTimestamp(resolvedTimestamp); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, + 13, PHOENIX_PATCH_NUMBER)); + builder.setSkipAddingParentColumns(false); + builder.setSkipAddingIndexes(false); + instance.getTable(controller, builder.build(), rpcCallback); + if (controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }; + int version = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, 13, PHOENIX_PATCH_NUMBER); + LOG.info("Client version: " + version); + HTableInterface ht = + queryServices.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + try { + final Map<byte[], MetaDataResponse> results = + ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable); + + assert (results.size() == 1); + MetaDataResponse result = results.values().iterator().next(); + assert (result.getTable().getIndexesCount() == 1); + assert (PIndexState.valueOf(result.getTable().getIndexes(0).getIndexState()) + .equals(PIndexState.DISABLE)); + } catch (Exception e) { + System.out.println("Exception Occurred: " + e); + + } finally { + Closeables.closeQuietly(ht); + } + + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index cd9efee..799a41b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -1403,17 +1403,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PIndexState indexState = indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv .getValueArray()[indexStateKv.getValueOffset()]); - // If client is not yet up to 4.12, then translate PENDING_ACTIVE to ACTIVE (as would have been - // the value in those versions) since the client won't have this index state in its enum. - if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < MetaDataProtocol.MIN_PENDING_ACTIVE_INDEX) { - indexState = PIndexState.ACTIVE; - } - // If client is not yet up to 4.14, then translate PENDING_DISABLE to DISABLE - // since the client won't have this index state in its enum. - if (indexState == PIndexState.PENDING_DISABLE && clientVersion < MetaDataProtocol.MIN_PENDING_DISABLE_INDEX) { - // note: for older clients, we have to rely on the rebuilder to transition PENDING_DISABLE -> DISABLE - indexState = PIndexState.DISABLE; - } + Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX]; boolean isImmutableRows = immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject( @@ -1596,6 +1586,34 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso decodeViewIndexId(viewIndexIdKv, viewIndexIdType); } + private PTable modifyIndexStateForOldClient(int clientVersion, PTable table) + throws SQLException { + if (table == null) { + return table; + } + // PHOENIX-5073 Sets the index state based on the client version in case of old clients. + // If client is not yet up to 4.12, then translate PENDING_ACTIVE to ACTIVE (as would have + // been the value in those versions) since the client won't have this index state in its + // enum. + if (table.getIndexState() == PIndexState.PENDING_ACTIVE + && clientVersion < MetaDataProtocol.MIN_PENDING_ACTIVE_INDEX) { + table = + PTableImpl.builderWithColumns(table, PTableImpl.getColumnsToClone(table)) + .setState(PIndexState.ACTIVE).build(); + } + // If client is not yet up to 4.14, then translate PENDING_DISABLE to DISABLE + // since the client won't have this index state in its enum. + if (table.getIndexState() == PIndexState.PENDING_DISABLE + && clientVersion < MetaDataProtocol.MIN_PENDING_DISABLE_INDEX) { + // note: for older clients, we have to rely on the rebuilder to transition + // PENDING_DISABLE -> DISABLE + table = + PTableImpl.builderWithColumns(table, PTableImpl.getColumnsToClone(table)) + .setState(PIndexState.DISABLE).build(); + } + return table; + } + /** * Returns viewIndexId based on its underlying data type * @@ -3675,6 +3693,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PTable table = getTableFromCache(cacheKey, clientTimeStamp, clientVersion, skipAddingIndexes, skipAddingParentColumns, lockedAncestorTable); + table = modifyIndexStateForOldClient(clientVersion, table); // We only cache the latest, so we'll end up building the table with every call if the // client connection has specified an SCN. // TODO: If we indicate to the client that we're returning an older version, but there's a