http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index feb5989..52b038b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -18,6 +18,9 @@ package org.apache.phoenix.query; import static org.apache.hadoop.hbase.HColumnDescriptor.TTL; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0; @@ -110,6 +113,7 @@ import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -966,7 +970,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement BlockingRpcCallback<GetVersionResponse> rpcCallback = new BlockingRpcCallback<GetVersionResponse>(); GetVersionRequest.Builder builder = GetVersionRequest.newBuilder(); - + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.getVersion(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); @@ -1265,6 +1269,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement MutationProto mp = ProtobufUtil.toProto(m); builder.addTableMetadataMutations(mp.toByteString()); } + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.createTable(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); @@ -1293,12 +1298,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement builder.setTableName(ByteStringer.wrap(tableBytes)); builder.setTableTimestamp(tableTimestamp); builder.setClientTimestamp(clientTimestamp); - - instance.getTable(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.getTable(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); } }); } @@ -1325,7 +1330,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } builder.setTableType(tableType.getSerializedValue()); builder.setCascade(cascade); - + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.dropTable(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); @@ -1379,6 +1384,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement builder.addTableMetadataMutations(mp.toByteString()); } builder.setIfExists(ifExists); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.dropFunction(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); @@ -1553,7 +1559,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement MutationProto mp = ProtobufUtil.toProto(m); builder.addTableMetadataMutations(mp.toByteString()); } - + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.addColumn(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); @@ -1804,6 +1810,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement MutationProto mp = ProtobufUtil.toProto(m); builder.addTableMetadataMutations(mp.toByteString()); } + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.dropColumn(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); @@ -2125,6 +2132,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement BlockingRpcCallback<ClearCacheResponse> rpcCallback = new BlockingRpcCallback<ClearCacheResponse>(); ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder(); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.clearCache(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); @@ -2189,23 +2197,24 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX], rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]); return metaDataCoprocessorExec(tableKey, new Batch.Call<MetaDataService, MetaDataResponse>() { - @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder(); - for (Mutation m : tableMetaData) { - MutationProto mp = ProtobufUtil.toProto(m); - builder.addTableMetadataMutations(mp.toByteString()); - } - instance.updateIndexState(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } - }); + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder(); + for (Mutation m : tableMetaData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); + } + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.updateIndexState(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }); } @Override @@ -2408,6 +2417,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement builder.setTableName(ByteStringer.wrap(tableName)); builder.setSchemaName(ByteStringer.wrap(schemaName)); builder.setClientTimestamp(clientTS); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.clearTableFromCache(controller, builder.build(), rpcCallback); if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } return rpcCallback.get(); @@ -2696,12 +2706,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement builder.addFunctionTimestamps(function.getSecond().longValue()); } builder.setClientTimestamp(clientTimestamp); - - instance.getFunctions(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); + instance.getFunctions(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); } }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); @@ -2732,6 +2742,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } builder.setTemporary(temporary); builder.setReplace(function.isReplace()); + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.createFunction(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index b1fcf30..f74133a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -234,8 +234,9 @@ public class MetaDataClient { TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_FAMILY + "," + - LINK_TYPE + - ") VALUES (?, ?, ?, ?, ?)"; + LINK_TYPE + "," + + TABLE_SEQ_NUM + // this is actually set to the parent table's sequence number + ") VALUES (?, ?, ?, ?, ?, ?)"; private static final String CREATE_VIEW_LINK = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + TENANT_ID + "," + @@ -1490,6 +1491,7 @@ public class MetaDataClient { linkStatement.setString(3, parentTableName); linkStatement.setString(4, tableName); linkStatement.setByte(5, LinkType.INDEX_TABLE.getSerializedValue()); + linkStatement.setLong(6, parent.getSequenceNumber()); linkStatement.execute(); } @@ -1649,6 +1651,12 @@ public class MetaDataClient { linkStatement.setString(3, tableName); linkStatement.setString(4, physicalName.getString()); linkStatement.setByte(5, LinkType.PHYSICAL_TABLE.getSerializedValue()); + if (tableType == PTableType.VIEW) { + PTable physicalTable = connection.getMetaDataCache().getTable(new PTableKey(null, physicalName.getString())); + linkStatement.setLong(6, physicalTable.getSequenceNumber()); + } else { + linkStatement.setLong(6, parent.getSequenceNumber()); + } linkStatement.execute(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java index fbc15be..ee73a58 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java @@ -51,6 +51,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.LinkType; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.SortOrder; @@ -59,6 +60,7 @@ import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PSmallint; +import org.apache.phoenix.schema.types.PUnsignedTinyint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -437,4 +439,16 @@ public class MetaDataUtil { scan.setStopRow(stopKey); return scan; } + + public static LinkType getLinkType(Mutation tableMutation) { + List<Cell> kvs = tableMutation.getFamilyCellMap().get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES); + if (kvs != null) { + for (Cell kv : kvs) { + if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), PhoenixDatabaseMetaData.LINK_TYPE_BYTES, 0, PhoenixDatabaseMetaData.LINK_TYPE_BYTES.length) == 0) { + return LinkType.fromSerializedValue(PUnsignedTinyint.INSTANCE.getCodec().decodeByte(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault())); + } + } + } + return null; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 61642bc..1a2019d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -83,8 +83,6 @@ import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.types.PDataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Splitter; @@ -99,8 +97,6 @@ import com.google.common.collect.Lists; * @since 0.1 */ public class PhoenixRuntime { - private static final Logger logger = LoggerFactory.getLogger(PhoenixRuntime.class); - /** * Use this connection property to control HBase timestamps * by specifying your own long timestamp value at connection time. All http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index 0ad6b9d..e1e2515 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -466,8 +466,8 @@ public class UpgradeUtil { public static void upgradeTo4_5_0(PhoenixConnection oldMetaConnection) throws SQLException { PhoenixConnection metaConnection = null; try { - // Need to use own connection without any SCN to be able to read all data from SYSTEM.CATALOG - metaConnection = new PhoenixConnection(oldMetaConnection); + // Need to use own connection with max time stamp to be able to read all data from SYSTEM.CATALOG + metaConnection = new PhoenixConnection(oldMetaConnection, HConstants.LATEST_TIMESTAMP); logger.info("Upgrading metadata to support adding columns to tables with views"); String getBaseTableAndViews = "SELECT " + COLUMN_FAMILY + " AS BASE_PHYSICAL_TABLE, " http://git-wip-us.apache.org/repos/asf/phoenix/blob/9f09f1a5/phoenix-protocol/src/main/MetaDataService.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto index e79f846..c265158 100644 --- a/phoenix-protocol/src/main/MetaDataService.proto +++ b/phoenix-protocol/src/main/MetaDataService.proto @@ -61,6 +61,7 @@ message GetTableRequest { required bytes tableName = 3; required int64 tableTimestamp = 4; required int64 clientTimestamp = 5; + optional int32 clientVersion = 6; } message GetFunctionsRequest { @@ -68,11 +69,13 @@ message GetFunctionsRequest { repeated bytes functionNames = 2; repeated int64 functionTimestamps = 3; required int64 clientTimestamp = 4; + optional int32 clientVersion = 5; } // each byte array represents a MutationProto instance message CreateTableRequest { - repeated bytes tableMetadataMutations = 1; + repeated bytes tableMetadataMutations = 1; + optional int32 clientVersion = 2; } // each byte array represents a MutationProto instance @@ -80,38 +83,46 @@ message CreateFunctionRequest { repeated bytes tableMetadataMutations = 1; required bool temporary = 2; optional bool replace = 3; + optional int32 clientVersion = 4; } message DropTableRequest { repeated bytes tableMetadataMutations = 1; required string tableType = 2; optional bool cascade = 3; + optional int32 clientVersion = 4; } message AddColumnRequest { repeated bytes tableMetadataMutations = 1; + optional int32 clientVersion = 2; } message DropColumnRequest { repeated bytes tableMetadataMutations = 1; + optional int32 clientVersion = 2; } message DropFunctionRequest { repeated bytes tableMetadataMutations = 1; optional bool ifExists = 2; + optional int32 clientVersion = 3; } message UpdateIndexStateRequest { repeated bytes tableMetadataMutations = 1; + optional int32 clientVersion = 2; } message ClearCacheRequest { + optional int32 clientVersion = 1; } message ClearCacheResponse { } message GetVersionRequest { + optional int32 clientVersion = 1; } message GetVersionResponse { @@ -123,6 +134,7 @@ message ClearTableFromCacheRequest { required bytes schemaName = 2; required bytes tableName = 3; required int64 clientTimestamp = 4; + optional int32 clientVersion = 5; } message ClearTableFromCacheResponse {