http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java index b12326a..278489d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -49,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -208,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri not care about it */ private void initColumnIndexes() throws SQLException { - columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR); + columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR); int columnIndex = 0; for(int index = 0; index < logicalNames.size(); index++) { PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); @@ -216,18 +217,22 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); byte[] family = new byte[0]; - if (c.getFamilyName() != null) // Skip PK column + byte[] cq; + if (!SchemaUtil.isPKColumn(c)) { family = c.getFamilyName().getBytes(); - byte[] name = c.getName().getBytes(); - byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); + cq = c.getColumnQualifierBytes(); + } else { + cq = c.getName().getBytes(); + } + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); if (!columnIndexes.containsKey(cfn)) { columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } } byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); - byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, - QueryConstants.EMPTY_COLUMN_BYTES); + byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue); columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } @@ -243,9 +248,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri private int findIndex(Cell cell) throws IOException { byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), + byte[] cq = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); + byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); if(columnIndexes.containsKey(cfn)) { return columnIndexes.get(cfn); }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index 15d6d2f..c529afe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -44,6 +44,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -89,7 +90,7 @@ public class FormatToKeyValueReducer } private void initColumnsMap(PhoenixConnection conn) throws SQLException { - Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR); + Map<byte[], Integer> indexMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); columnIndexes = new HashMap<>(); int columnIndex = 0; for (int index = 0; index < logicalNames.size(); index++) { @@ -98,12 +99,16 @@ public class FormatToKeyValueReducer for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); byte[] family = new byte[0]; - if (c.getFamilyName() != null) { + byte[] cq; + if (!SchemaUtil.isPKColumn(c)) { family = c.getFamilyName().getBytes(); + cq = c.getColumnQualifierBytes(); + } else { + // TODO: samarth verify if this is the right thing to do here. + cq = c.getName().getBytes(); } - byte[] name = c.getName().getBytes(); - byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); - Pair<byte[], byte[]> pair = new Pair(family, name); + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); + Pair<byte[], byte[]> pair = new Pair<>(family, cq); if (!indexMap.containsKey(cfn)) { indexMap.put(cfn, new Integer(columnIndex)); columnIndexes.put(new Integer(columnIndex), pair); @@ -111,8 +116,8 @@ public class FormatToKeyValueReducer } } byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); - Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, QueryConstants - .EMPTY_COLUMN_BYTES); + byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + Pair<byte[], byte[]> pair = new Pair<>(emptyColumnFamily, emptyKeyValue); columnIndexes.put(new Integer(columnIndex), pair); columnIndex++; } @@ -123,18 +128,17 @@ public class FormatToKeyValueReducer Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context) throws IOException, InterruptedException { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); - ImmutableBytesWritable rowKey = key.getRowkey(); for (ImmutableBytesWritable aggregatedArray : values) { DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get())); while (input.available() != 0) { byte type = input.readByte(); int index = WritableUtils.readVInt(input); ImmutableBytesWritable family; - ImmutableBytesWritable name; + ImmutableBytesWritable cq; ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR; Pair<byte[], byte[]> pair = columnIndexes.get(index); family = new ImmutableBytesWritable(pair.getFirst()); - name = new ImmutableBytesWritable(pair.getSecond()); + cq = new ImmutableBytesWritable(pair.getSecond()); int len = WritableUtils.readVInt(input); if (len > 0) { byte[] array = new byte[len]; @@ -145,10 +149,10 @@ public class FormatToKeyValueReducer KeyValue.Type kvType = KeyValue.Type.codeToType(type); switch (kvType) { case Put: // not null value - kv = builder.buildPut(key.getRowkey(), family, name, value); + kv = builder.buildPut(key.getRowkey(), family, cq, value); break; case DeleteColumn: // null value - kv = builder.buildDeleteColumns(key.getRowkey(), family, name); + kv = builder.buildDeleteColumns(key.getRowkey(), family, cq); break; default: throw new IOException("Unsupported KeyValue type " + kvType); @@ -164,4 +168,4 @@ public class FormatToKeyValueReducer if (++index % 100 == 0) context.setStatus("Wrote " + index); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index ca784c0..090e79b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -24,11 +24,34 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER; +import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE; @@ -42,6 +65,7 @@ import java.lang.ref.WeakReference; import java.sql.PreparedStatement; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -161,11 +185,13 @@ import org.apache.phoenix.schema.ColumnAlreadyExistsException; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.EmptySequenceCacheException; import org.apache.phoenix.schema.FunctionNotFoundException; +import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.MetaDataSplitPolicy; import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PMetaDataImpl; import org.apache.phoenix.schema.PName; @@ -179,6 +205,7 @@ import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; +import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableProperty; @@ -188,7 +215,9 @@ import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PUnsignedTinyint; +import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; @@ -589,7 +618,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement latestMetaDataLock.notifyAll(); } } - @Override public void updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { synchronized (latestMetaDataLock) { @@ -2705,6 +2733,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0); clearCache(); } + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) { + metaConnection = addColumnQualifierColumn(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 3); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 2, + PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME + " " + + PTinyint.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 1, + PhoenixDatabaseMetaData.ENCODING_SCHEME + " " + + PTinyint.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0, + PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER + " " + + PInteger.INSTANCE.getSqlTypeName()); + ConnectionQueryServicesImpl.this.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0); + clearCache(); + } } @@ -2842,6 +2895,84 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + + // Special method for adding the column qualifier column for 4.10. + private PhoenixConnection addColumnQualifierColumn(PhoenixConnection oldMetaConnection, Long timestamp) throws SQLException { + Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo()); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp)); + // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again + PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props); + PTable sysCatalogPTable = metaConnection.getTable(new PTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)); + int numColumns = sysCatalogPTable.getColumns().size(); + try (PreparedStatement mutateTable = metaConnection.prepareStatement(MetaDataClient.MUTATE_TABLE)) { + mutateTable.setString(1, null); + mutateTable.setString(2, SYSTEM_CATALOG_SCHEMA); + mutateTable.setString(3, SYSTEM_CATALOG_TABLE); + mutateTable.setString(4, PTableType.SYSTEM.getSerializedValue()); + mutateTable.setLong(5, sysCatalogPTable.getSequenceNumber() + 1); + mutateTable.setInt(6, numColumns + 1); + mutateTable.execute(); + } + List<Mutation> tableMetadata = new ArrayList<>(); + tableMetadata.addAll(metaConnection.getMutationState().toMutations(metaConnection.getSCN()).next().getSecond()); + metaConnection.rollback(); + PColumn column = new PColumnImpl(PNameFactory.newName("COLUMN_QUALIFIER"), + PNameFactory.newName(DEFAULT_COLUMN_FAMILY_NAME), PVarbinary.INSTANCE, null, null, true, numColumns, + SortOrder.ASC, null, null, false, null, false, false, + Bytes.toBytes("COLUMN_QUALIFIER")); + String upsertColumnMetadata = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + + TENANT_ID + "," + + TABLE_SCHEM + "," + + TABLE_NAME + "," + + COLUMN_NAME + "," + + COLUMN_FAMILY + "," + + DATA_TYPE + "," + + NULLABLE + "," + + COLUMN_SIZE + "," + + DECIMAL_DIGITS + "," + + ORDINAL_POSITION + "," + + SORT_ORDER + "," + + DATA_TABLE_NAME + "," + + ARRAY_SIZE + "," + + VIEW_CONSTANT + "," + + IS_VIEW_REFERENCED + "," + + PK_NAME + "," + + KEY_SEQ + "," + + COLUMN_DEF + "," + + IS_ROW_TIMESTAMP + + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + try (PreparedStatement colUpsert = metaConnection.prepareStatement(upsertColumnMetadata)) { + colUpsert.setString(1, null); + colUpsert.setString(2, SYSTEM_CATALOG_SCHEMA); + colUpsert.setString(3, SYSTEM_CATALOG_TABLE); + colUpsert.setString(4, "COLUMN_QUALIFIER"); + colUpsert.setString(5, DEFAULT_COLUMN_FAMILY); + colUpsert.setInt(6, column.getDataType().getSqlType()); + colUpsert.setInt(7, ResultSetMetaData.columnNullable); + colUpsert.setNull(8, Types.INTEGER); + colUpsert.setNull(9, Types.INTEGER); + colUpsert.setInt(10, sysCatalogPTable.getBucketNum() != null ? numColumns : (numColumns + 1)); + colUpsert.setInt(11, SortOrder.ASC.getSystemValue()); + colUpsert.setString(12, null); + colUpsert.setNull(13, Types.INTEGER); + colUpsert.setBytes(14, null); + colUpsert.setBoolean(15, false); + colUpsert.setString(16, sysCatalogPTable.getPKName() == null ? null : sysCatalogPTable.getPKName().getString()); + colUpsert.setNull(17, Types.SMALLINT); + colUpsert.setNull(18, Types.VARCHAR); + colUpsert.setBoolean(19, false); + colUpsert.execute(); + } + tableMetadata.addAll(metaConnection.getMutationState().toMutations(metaConnection.getSCN()).next().getSecond()); + metaConnection.rollback(); + metaConnection.getQueryServices().addColumn(tableMetadata, sysCatalogPTable, Collections.<String,List<Pair<String,Object>>>emptyMap(), Collections.<String>emptySet(), Lists.newArrayList(column)); + metaConnection.removeTable(null, SYSTEM_CATALOG_NAME, null, timestamp); + ConnectionQueryServicesImpl.this.removeTable(null, + SYSTEM_CATALOG_NAME, null, + timestamp); + clearCache(); + return metaConnection; + } private void createSnapshot(String snapshotName, String tableName) throws SQLException { @@ -4137,4 +4268,4 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public Configuration getConfiguration() { return config; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 042d7a6..76b69fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -93,7 +93,6 @@ import org.apache.tephra.inmemory.InMemoryTxSystemClient; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - /** * * Implementation of ConnectionQueryServices used in testing where no connection to http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 8e2dc1a..6f105f1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -31,6 +31,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG; @@ -40,11 +42,13 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POST_KEY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE; @@ -117,8 +121,8 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.schema.MetaDataSplitPolicy; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.util.ByteUtil; /** @@ -149,23 +153,30 @@ public interface QueryConstants { public final static byte[] OFFSET_ROW_KEY_BYTES = Bytes.toBytes(OFFSET_ROW_KEY); public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES); - public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s"); - public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s"); - public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes(); - public final static byte[] SINGLE_COLUMN_FAMILY = SINGLE_COLUMN_FAMILY_NAME.getBytes(); - public static final long AGG_TIMESTAMP = HConstants.LATEST_TIMESTAMP; /** * Key used for a single row aggregation where there is no group by */ public final static byte[] UNGROUPED_AGG_ROW_KEY = Bytes.toBytes("a"); - public final static PName AGG_COLUMN_NAME = SINGLE_COLUMN_NAME; - public final static PName AGG_COLUMN_FAMILY_NAME = SINGLE_COLUMN_FAMILY_NAME; - - public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = Bytes.toBytes("a"); - // Use empty byte array for column qualifier so as not to accidentally conflict with any other columns - public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = ByteUtil.EMPTY_BYTE_ARRAY; + + /** BEGIN Set of reserved column qualifiers **/ + + public static final String RESERVED_COLUMN_FAMILY = "_v"; + public static final byte[] RESERVED_COLUMN_FAMILY_BYTES = Bytes.toBytes(RESERVED_COLUMN_FAMILY); + + public static final byte[] VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES; + public static final byte[] VALUE_COLUMN_QUALIFIER = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(1); + + public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES; + public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(2); + + public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s"); + public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s"); + public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes(); + public final static byte[] SINGLE_COLUMN_FAMILY = SINGLE_COLUMN_FAMILY_NAME.getBytes(); + /** END Set of reserved column qualifiers **/ + public static final byte[] TRUE = new byte[] {1}; /** @@ -192,15 +203,25 @@ public interface QueryConstants { public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME); public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr( EMPTY_COLUMN_BYTES); + public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0; + public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(ENCODED_EMPTY_COLUMN_NAME); public final static String EMPTY_COLUMN_VALUE = "x"; public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE); public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr( EMPTY_COLUMN_VALUE_BYTES); - + public static final String ENCODED_EMPTY_COLUMN_VALUE = EMPTY_COLUMN_VALUE; + public final static byte[] ENCODED_EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE); + public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr( + ENCODED_EMPTY_COLUMN_VALUE_BYTES); public static final String DEFAULT_COLUMN_FAMILY = "0"; public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY); public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr( DEFAULT_COLUMN_FAMILY_BYTES); + // column qualifier of the single key value used to store all columns for the COLUMNS_STORED_IN_SINGLE_CELL storage scheme + public static final String SINGLE_KEYVALUE_COLUMN_QUALIFIER = "1"; + public final static byte[] SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES = Bytes.toBytes(SINGLE_KEYVALUE_COLUMN_QUALIFIER); + public static final ImmutableBytesPtr SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR = new ImmutableBytesPtr( + SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES); public static final String LOCAL_INDEX_COLUMN_FAMILY_PREFIX = "L#"; public static final byte[] LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES = Bytes.toBytes(LOCAL_INDEX_COLUMN_FAMILY_PREFIX); @@ -222,6 +243,12 @@ public interface QueryConstants { public static final int NANOS_IN_SECOND = BigDecimal.valueOf(Math.pow(10, 9)).intValue(); public static final int DIVERGED_VIEW_BASE_COLUMN_COUNT = -100; public static final int BASE_TABLE_BASE_COLUMN_COUNT = -1; + + /** + * We mark counter values 0 to 10 as reserved. Value 0 is used by {@link #ENCODED_EMPTY_COLUMN_NAME}. Values 1-10 + * are reserved for special column qualifiers returned by Phoenix co-processors. + */ + public static final int ENCODED_CQ_COUNTER_INITIAL_VALUE = 11; public static final String CREATE_TABLE_METADATA = // Do not use IF NOT EXISTS as we sometimes catch the TableAlreadyExists // exception and add columns to the SYSTEM.TABLE dynamically. @@ -289,6 +316,10 @@ public interface QueryConstants { AUTO_PARTITION_SEQ + " VARCHAR," + APPEND_ONLY_SCHEMA + " BOOLEAN," + GUIDE_POSTS_WIDTH + " BIGINT," + + COLUMN_QUALIFIER + " VARBINARY," + + IMMUTABLE_STORAGE_SCHEME + " TINYINT, " + + ENCODING_SCHEME + " TINYINT, " + + COLUMN_QUALIFIER_COUNTER + " INTEGER, " + "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + @@ -363,5 +394,5 @@ public interface QueryConstants { public static final byte[] OFFSET_FAMILY = "f_offset".getBytes(); public static final byte[] OFFSET_COLUMN = "c_offset".getBytes(); public static final String LAST_SCAN = "LAST_SCAN"; - + public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 585d0ae..a6f8e73 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -235,9 +235,11 @@ public interface QueryServices extends SQLCloseable { public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding"; public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled"; - public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS = "phoenix.client.connection.max.duration"; + public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib"; + public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme"; + /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 43436b8..183ee03 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -92,6 +92,8 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableRefFactory; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.DateUtil; @@ -248,7 +250,6 @@ public class QueryServicesOptions { public static final long DEFAULT_QUERY_SERVER_UGI_CACHE_MAX_SIZE = 1000L; public static final int DEFAULT_QUERY_SERVER_UGI_CACHE_INITIAL_SIZE = 100; public static final int DEFAULT_QUERY_SERVER_UGI_CACHE_CONCURRENCY = 10; - public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true; public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS = DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2; @@ -263,6 +264,8 @@ public class QueryServicesOptions { public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString(); public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true; public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000; + public static final int DEFAULT_COLUMN_ENCODED_BYTES = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.getSerializedMetadataValue(); + public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.toString(); @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java index 1d772b4..c73b860 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java @@ -17,6 +17,10 @@ */ package org.apache.phoenix.schema; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Arrays; + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.http.annotation.Immutable; import org.apache.phoenix.compile.ExpressionCompiler; @@ -25,19 +29,17 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.function.DefaultValueExpression; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Arrays; - /** * @@ -59,7 +61,7 @@ public class ColumnRef { } public ColumnRef(TableRef tableRef, String familyName, String columnName) throws MetaDataEntityNotFoundException { - this(tableRef, tableRef.getTable().getColumnFamily(familyName).getColumn(columnName).getPosition()); + this(tableRef, tableRef.getTable().getColumnFamily(familyName).getPColumnForColumnName(columnName).getPosition()); } public ColumnRef(TableRef tableRef, int columnPosition) { @@ -123,7 +125,8 @@ public class ColumnRef { return new ProjectedColumnExpression(column, table, displayName); } - Expression expression = new KeyValueColumnExpression(column, displayName); + Expression expression = table.getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS ? + new SingleCellColumnExpression(column, displayName, table.getEncodingScheme()) : new KeyValueColumnExpression(column, displayName); if (column.getExpressionStr() != null) { String url = PhoenixRuntime.JDBC_PROTOCOL @@ -140,7 +143,6 @@ public class ColumnRef { return new DefaultValueExpression(Arrays.asList(expression, defaultExpression)); } } - return expression; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java new file mode 100644 index 0000000..5ae72d1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + +/** + * Interface to decode column values that are stored in a byte[] + */ +public interface ColumnValueDecoder { + /** + * sets the ptr to the column value at the given index + * @return false if the column value is absent (used to support DEFAULT expressions) or else true + */ + boolean decode(ImmutableBytesWritable ptr, int index); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java new file mode 100644 index 0000000..5e930bd --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; + + +/** + * Interface to encode column values into a serialized byte[] that will be stored in a single cell + * The last byte of the serialized byte[] should be the serialized value of the {@link ImmutableStorageScheme} + * that was used. + */ +public interface ColumnValueEncoder { + + /** + * append a column value to the array + */ + void appendValue(byte[] bytes, int offset, int length); + + /** + * append a value that is not present to the array (used to support DEFAULT expressions) + */ + void appendAbsentValue(); + + /** + * @return the encoded byte[] that contains the serialized column values + */ + byte[] encode(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java index 62d2e3f..c220ed0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java @@ -100,4 +100,8 @@ public class DelegateColumn extends DelegateDatum implements PColumn { public boolean equals(Object o) { return getDelegate().equals(o); } + @Override + public byte[] getColumnQualifierBytes() { + return getDelegate().getColumnQualifierBytes(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index 7d39dfe..3168b95 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -96,8 +96,8 @@ public class DelegateTable implements PTable { } @Override - public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException { - return delegate.getColumn(name); + public PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException { + return delegate.getPColumnForColumnName(name); } @Override @@ -290,4 +290,24 @@ public class DelegateTable implements PTable { public boolean equals(Object obj) { return delegate.equals(obj); } + + @Override + public ImmutableStorageScheme getImmutableStorageScheme() { + return delegate.getImmutableStorageScheme(); + } + + @Override + public PColumn getPColumnForColumnQualifier(byte[] cf, byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException { + return delegate.getPColumnForColumnQualifier(cf, cq); + } + + @Override + public EncodedCQCounter getEncodedCQCounter() { + return delegate.getEncodedCQCounter(); + } + + @Override + public QualifierEncodingScheme getEncodingScheme() { + return delegate.getEncodingScheme(); + } }