http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 67b7663..ee1af19 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -47,6 +47,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; @@ -71,7 +72,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; @@ -93,9 +93,9 @@ import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC; import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER; +import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN; +import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS; import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; -import static org.apache.phoenix.schema.PTable.StorageScheme.ONE_CELL_PER_COLUMN_FAMILY; -import static org.apache.phoenix.schema.PTable.StorageScheme.ONE_CELL_PER_KEYVALUE_COLUMN; import static org.apache.phoenix.schema.PTable.ViewType.MAPPED; import static org.apache.phoenix.schema.PTableType.TABLE; import static org.apache.phoenix.schema.PTableType.VIEW; @@ -195,11 +195,11 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable.EncodedCQCounter; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.LinkType; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme.QualifierOutOfRangeException; -import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.types.PDataType; @@ -273,7 +273,7 @@ public class MetaDataClient { AUTO_PARTITION_SEQ + "," + APPEND_ONLY_SCHEMA + "," + GUIDE_POSTS_WIDTH + "," + - STORAGE_SCHEME + "," + + IMMUTABLE_STORAGE_SCHEME + "," + ENCODING_SCHEME + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; @@ -1695,7 +1695,8 @@ public class MetaDataClient { ? SchemaUtil.isNamespaceMappingEnabled(tableType, connection.getQueryServices().getProps()) : parent.isNamespaceMapped(); boolean isLocalIndex = indexType == IndexType.LOCAL; - QualifierEncodingScheme encodingScheme = QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS; + ImmutableStorageScheme immutableStorageScheme = ONE_CELL_PER_COLUMN; if (parent != null && tableType == PTableType.INDEX) { timestamp = TransactionUtil.getTableTimestamp(connection, transactional); storeNulls = parent.getStoreNulls(); @@ -2033,7 +2034,6 @@ public class MetaDataClient { } int pkPositionOffset = pkColumns.size(); int position = positionOffset; - StorageScheme storageScheme = ONE_CELL_PER_KEYVALUE_COLUMN; EncodedCQCounter cqCounter = NULL_COUNTER; PTable viewPhysicalTable = null; if (tableType == PTableType.VIEW) { @@ -2048,7 +2048,7 @@ public class MetaDataClient { * encoded column qualifier. */ viewPhysicalTable = PhoenixRuntime.getTable(connection, physicalNames.get(0).getString()); - storageScheme = viewPhysicalTable.getStorageScheme(); + immutableStorageScheme = viewPhysicalTable.getImmutableStorageScheme(); encodingScheme = viewPhysicalTable.getEncodingScheme(); if (EncodedColumnsUtil.usesEncodedColumnNames(viewPhysicalTable)) { cqCounter = viewPhysicalTable.getEncodedCQCounter(); @@ -2088,21 +2088,29 @@ public class MetaDataClient { } if (tableExists) { encodingScheme = NON_ENCODED_QUALIFIERS; + immutableStorageScheme = ONE_CELL_PER_COLUMN; } else if (parent != null) { encodingScheme = parent.getEncodingScheme(); + immutableStorageScheme = parent.getImmutableStorageScheme(); } else { Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps); if (encodingSchemeSerializedByte == null) { encodingSchemeSerializedByte = (byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES); } encodingScheme = QualifierEncodingScheme.fromSerializedValue(encodingSchemeSerializedByte); + if (isImmutableRows) { + immutableStorageScheme = (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME.getValue(tableProps); + if (immutableStorageScheme == null) { + immutableStorageScheme = ImmutableStorageScheme.valueOf(connection.getQueryServices().getProps().get(QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME)); + } + if (immutableStorageScheme!=ONE_CELL_PER_COLUMN && encodingScheme == NON_ENCODED_QUALIFIERS) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_AND_COLUMN_QUALIFIER_BYTES).setSchemaName(schemaName).setTableName(tableName).build().buildException(); + } + } } - if (isImmutableRows && encodingScheme != NON_ENCODED_QUALIFIERS) { - storageScheme = ONE_CELL_PER_COLUMN_FAMILY; - } cqCounter = encodingScheme != NON_ENCODED_QUALIFIERS ? new EncodedCQCounter() : NULL_COUNTER; } - + Map<String, Integer> changedCqCounters = new HashMap<>(colDefs.size()); for (ColumnDef colDef : colDefs) { rowTimeStampColumnAlreadyFound = checkAndValidateRowTimestampCol(colDef, pkConstraint, rowTimeStampColumnAlreadyFound, tableType); @@ -2127,7 +2135,7 @@ public class MetaDataClient { boolean isPkColumn = isPkColumn(pkConstraint, colDef, columnDefName); String cqCounterFamily = null; if (!isPkColumn) { - if (storageScheme == ONE_CELL_PER_COLUMN_FAMILY && encodingScheme != NON_ENCODED_QUALIFIERS) { + if (immutableStorageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS && encodingScheme != NON_ENCODED_QUALIFIERS) { // For this scheme we track column qualifier counters at the column family level. cqCounterFamily = colDefFamily != null ? colDefFamily : (defaultFamilyName != null ? defaultFamilyName : DEFAULT_COLUMN_FAMILY); } else { @@ -2272,7 +2280,7 @@ public class MetaDataClient { Collections.<PTable>emptyList(), isImmutableRows, Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, - Boolean.TRUE.equals(disableWAL), false, false, null, null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, ONE_CELL_PER_KEYVALUE_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); + Boolean.TRUE.equals(disableWAL), false, false, null, null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP); } @@ -2426,7 +2434,7 @@ public class MetaDataClient { } else { tableUpsert.setLong(25, guidePostsWidth); } - tableUpsert.setByte(26, storageScheme.getSerializedMetadataValue()); //TODO: samarth should there be a null check here? + tableUpsert.setByte(26, immutableStorageScheme.getSerializedMetadataValue()); //TODO: samarth should there be a null check here? tableUpsert.setByte(27, encodingScheme.getSerializedMetadataValue()); tableUpsert.execute(); @@ -2532,7 +2540,7 @@ public class MetaDataClient { PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns.values(), parent == null ? null : parent.getSchemaName(), parent == null ? null : parent.getTableName(), Collections.<PTable>emptyList(), isImmutableRows, physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType, - result.getViewIndexId(), indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounterToBe); + result.getViewIndexId(), indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, immutableStorageScheme, encodingScheme, cqCounterToBe); result = new MetaDataMutationResult(code, result.getMutationTime(), table, true); addTableToCache(result); return table; @@ -2716,7 +2724,7 @@ public class MetaDataClient { PTable viewIndexTable = new PTableImpl(null, SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName), SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts, - table.getColumnFamilies(),table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme()); + table.getColumnFamilies(),table.isNamespaceMapped(), table.getImmutableStorageScheme(), table.getEncodingScheme()); tableRefs.add(new TableRef(null, viewIndexTable, ts, false)); } } @@ -2837,12 +2845,12 @@ public class MetaDataClient { } private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency) throws SQLException { - return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, updateCacheFrequency, null, null, null, null, -1L, null); + return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, updateCacheFrequency, null, null, null, null, -1L, null, null); } private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency, Boolean isImmutableRows, Boolean disableWAL, - Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema) + Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema, ImmutableStorageScheme immutableStorageScheme) throws SQLException { String schemaName = table.getSchemaName().getString(); String tableName = table.getTableName().getString(); @@ -2886,6 +2894,10 @@ public class MetaDataClient { if (appendOnlySchema !=null) { mutateBooleanProperty(tenantId, schemaName, tableName, APPEND_ONLY_SCHEMA, appendOnlySchema); } + if (immutableStorageScheme !=null) { + mutateStringProperty(tenantId, schemaName, tableName, IMMUTABLE_STORAGE_SCHEME, immutableStorageScheme.name()); + } + return seqNum; } @@ -2926,6 +2938,23 @@ public class MetaDataClient { tableBoolUpsert.execute(); } } + + private void mutateStringProperty(String tenantId, String schemaName, String tableName, + String propertyName, String propertyValue) throws SQLException { + String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + + TENANT_ID + "," + + TABLE_SCHEM + "," + + TABLE_NAME + "," + + propertyName + + ") VALUES (?, ?, ?, ?)"; + try (PreparedStatement tableBoolUpsert = connection.prepareStatement(updatePropertySql)) { + tableBoolUpsert.setString(1, tenantId); + tableBoolUpsert.setString(2, schemaName); + tableBoolUpsert.setString(3, tableName); + tableBoolUpsert.setString(4, propertyValue); + tableBoolUpsert.execute(); + } + } public MutationState addColumn(AddColumnStatement statement) throws SQLException { PTable table = FromCompiler.getResolver(statement, connection).getTables().get(0).getTable(); @@ -2951,6 +2980,7 @@ public class MetaDataClient { Long updateCacheFrequencyProp = null; Boolean appendOnlySchemaProp = null; Long guidePostWidth = -1L; + ImmutableStorageScheme immutableStorageSchemeProp = null; Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size()); List<ColumnDef> columnDefs = null; @@ -3013,6 +3043,8 @@ public class MetaDataClient { guidePostWidth = (Long)value; } else if (propName.equals(APPEND_ONLY_SCHEMA)) { appendOnlySchemaProp = (Boolean) value; + } else if (propName.equalsIgnoreCase(IMMUTABLE_STORAGE_SCHEME)) { + immutableStorageSchemeProp = (ImmutableStorageScheme)value; } } // if removeTableProps is true only add the property if it is not a HTable or Phoenix Table property @@ -3055,7 +3087,7 @@ public class MetaDataClient { Boolean isImmutableRows = null; if (isImmutableRowsProp != null) { if (isImmutableRowsProp.booleanValue() != table.isImmutableRows()) { - if (table.getStorageScheme() != StorageScheme.ONE_CELL_PER_KEYVALUE_COLUMN) { + if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY) .setSchemaName(schemaName).setTableName(tableName).build().buildException(); } @@ -3091,6 +3123,18 @@ public class MetaDataClient { changingPhoenixTableProperty = true; } } + ImmutableStorageScheme immutableStorageScheme = null; + if (immutableStorageSchemeProp!=null) { + if (table.getImmutableStorageScheme() == ONE_CELL_PER_COLUMN || + immutableStorageSchemeProp == ONE_CELL_PER_COLUMN) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE) + .setSchemaName(schemaName).setTableName(tableName).build().buildException(); + } + else if (immutableStorageSchemeProp != table.getImmutableStorageScheme()) { + immutableStorageScheme = immutableStorageSchemeProp; + changingPhoenixTableProperty = true; + } + } if (guidePostWidth == null || guidePostWidth >= 0) { changingPhoenixTableProperty = true; @@ -3170,13 +3214,13 @@ public class MetaDataClient { if (!colDef.isPK()) { String colDefFamily = colDef.getColumnDefName().getFamilyName(); String familyName = null; - StorageScheme storageScheme = table.getStorageScheme(); + ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme(); String defaultColumnFamily = tableForCQCounters.getDefaultFamilyName() != null && !Strings.isNullOrEmpty(tableForCQCounters.getDefaultFamilyName().getString()) ? tableForCQCounters.getDefaultFamilyName().getString() : DEFAULT_COLUMN_FAMILY; if (table.getType() == PTableType.INDEX && table.getIndexType() == IndexType.LOCAL) { defaultColumnFamily = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + defaultColumnFamily; } - if (storageScheme == ONE_CELL_PER_COLUMN_FAMILY) { + if (storageScheme == SINGLE_CELL_ARRAY_WITH_OFFSETS) { familyName = colDefFamily != null ? colDefFamily : defaultColumnFamily; } else { familyName = defaultColumnFamily; @@ -3271,10 +3315,9 @@ public class MetaDataClient { connection.rollback(); } - long seqNum = table.getSequenceNumber(); if (changingPhoenixTableProperty || columnDefs.size() > 0) { - seqNum = incrementTableSeqNum(table, tableType, columnDefs.size(), isTransactional, updateCacheFrequency, isImmutableRows, - disableWAL, multiTenant, storeNulls, guidePostWidth, appendOnlySchema); + incrementTableSeqNum(table, tableType, columnDefs.size(), isTransactional, updateCacheFrequency, isImmutableRows, + disableWAL, multiTenant, storeNulls, guidePostWidth, appendOnlySchema, immutableStorageScheme); tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); } @@ -3371,7 +3414,7 @@ public class MetaDataClient { PTable viewIndexTable = new PTableImpl(null, SchemaUtil.getSchemaNameFromFullName(viewIndexPhysicalName), SchemaUtil.getTableNameFromFullName(viewIndexPhysicalName), ts, - table.getColumnFamilies(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme()); + table.getColumnFamilies(), table.isNamespaceMapped(), table.getImmutableStorageScheme(), table.getEncodingScheme()); List<TableRef> tableRefs = Collections.singletonList(new TableRef(null, viewIndexTable, ts, false)); MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null, Collections.<PColumn> emptyList(), ts); @@ -3637,7 +3680,7 @@ public class MetaDataClient { sharedTableState.getSchemaName(), sharedTableState.getTableName(), ts, table.getColumnFamilies(), sharedTableState.getColumns(), sharedTableState.getPhysicalNames(), sharedTableState.getViewIndexId(), - table.isMultiTenant(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.isMultiTenant(), table.isNamespaceMapped(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); TableRef indexTableRef = new TableRef(viewIndexTable); PName indexTableTenantId = sharedTableState.getTenantId(); if (indexTableTenantId==null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 5e9608b..4a02e54 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -19,6 +19,7 @@ package org.apache.phoenix.schema; import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE; +import java.io.DataOutputStream; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -32,6 +33,12 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.types.PArrayDataType; +import org.apache.phoenix.schema.types.PArrayDataTypeDecoder; +import org.apache.phoenix.schema.types.PArrayDataTypeEncoder; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; import com.google.common.annotations.VisibleForTesting; @@ -164,32 +171,57 @@ public interface PTable extends PMetaDataEntity { } } - public enum StorageScheme { - ONE_CELL_PER_KEYVALUE_COLUMN((byte)1), - ONE_CELL_PER_COLUMN_FAMILY((byte)2); + public enum ImmutableStorageScheme implements ColumnValueEncoderDecoderSupplier { + ONE_CELL_PER_COLUMN((byte)1) { + @Override + public ColumnValueEncoder getEncoder(int numElements) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnValueDecoder getDecoder() { + throw new UnsupportedOperationException(); + } + }, + // stores a single cell per column family that contains all serialized column values + SINGLE_CELL_ARRAY_WITH_OFFSETS((byte)2) { + @Override + public ColumnValueEncoder getEncoder(int numElements) { + PDataType type = PVarbinary.INSTANCE; + int estimatedSize = PArrayDataType.estimateSize(numElements, type); + TrustedByteArrayOutputStream byteStream = new TrustedByteArrayOutputStream(estimatedSize); + DataOutputStream oStream = new DataOutputStream(byteStream); + return new PArrayDataTypeEncoder(byteStream, oStream, numElements, type, SortOrder.ASC, false, PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION); + } + + @Override + public ColumnValueDecoder getDecoder() { + return new PArrayDataTypeDecoder(); + } + }; - private final byte[] byteValue; private final byte serializedValue; - StorageScheme(byte serializedValue) { + private ImmutableStorageScheme(byte serializedValue) { this.serializedValue = serializedValue; - this.byteValue = Bytes.toBytes(this.name()); - } - - public byte[] getBytes() { - return byteValue; } public byte getSerializedMetadataValue() { return this.serializedValue; } - public static StorageScheme fromSerializedValue(byte serializedValue) { - if (serializedValue < 1 || serializedValue > StorageScheme.values().length) { + public static ImmutableStorageScheme fromSerializedValue(byte serializedValue) { + if (serializedValue < 1 || serializedValue > ImmutableStorageScheme.values().length) { return null; } - return StorageScheme.values()[serializedValue-1]; + return ImmutableStorageScheme.values()[serializedValue-1]; } + + } + + interface ColumnValueEncoderDecoderSupplier { + ColumnValueEncoder getEncoder(int numElements); + ColumnValueDecoder getDecoder(); } public enum QualifierEncodingScheme implements QualifierEncoderDecoder { @@ -393,7 +425,7 @@ public interface PTable extends PMetaDataEntity { int decode(byte[] bytes, int offset, int length); Integer getMaxQualifier(); } - + long getTimeStamp(); long getSequenceNumber(); long getIndexDisableTimestamp(); @@ -607,7 +639,7 @@ public interface PTable extends PMetaDataEntity { * you are also not allowed to delete the table */ boolean isAppendOnlySchema(); - StorageScheme getStorageScheme(); + ImmutableStorageScheme getImmutableStorageScheme(); QualifierEncodingScheme getEncodingScheme(); EncodedCQCounter getEncodedCQCounter(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index e84e529..0816fea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -50,10 +50,9 @@ import org.apache.phoenix.compile.ExpressionCompiler; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; -import org.apache.phoenix.expression.ArrayConstructorExpression; -import org.apache.phoenix.expression.DelegateExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; @@ -63,16 +62,12 @@ import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder; -import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; -import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -153,7 +148,7 @@ public class PTableImpl implements PTable { private boolean isNamespaceMapped; private String autoPartitionSeqName; private boolean isAppendOnlySchema; - private StorageScheme storageScheme; + private ImmutableStorageScheme immutableStorageScheme; private QualifierEncodingScheme qualifierEncodingScheme; private EncodedCQCounter encodedCQCounter; @@ -188,7 +183,7 @@ public class PTableImpl implements PTable { this.isNamespaceMapped = isNamespaceMapped; } - public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped, StorageScheme storageScheme, QualifierEncodingScheme encodingScheme) { // For base table of mapped VIEW + public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped, ImmutableStorageScheme storageScheme, QualifierEncodingScheme encodingScheme) { // For base table of mapped VIEW Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty this.tenantId = tenantId; this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, tableName)); @@ -210,13 +205,13 @@ public class PTableImpl implements PTable { this.families = families; this.physicalNames = Collections.emptyList(); this.isNamespaceMapped = isNamespaceMapped; - this.storageScheme = storageScheme; + this.immutableStorageScheme = storageScheme; this.qualifierEncodingScheme = encodingScheme; } // For indexes stored in shared physical tables public PTableImpl(PName tenantId, PName schemaName, PName tableName, long timestamp, List<PColumnFamily> families, - List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped, StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, + List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException { this.pkColumns = this.allColumns = Collections.emptyList(); this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA; @@ -275,7 +270,7 @@ public class PTableImpl implements PTable { indexes, table.isImmutableRows(), physicalNames, table.getDefaultFamilyName(), viewStatement, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), updateCacheFrequency, - table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes, PName parentSchemaName, String viewStatement) throws SQLException { @@ -285,7 +280,7 @@ public class PTableImpl implements PTable { indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), - table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PTable table, Collection<PColumn> columns) throws SQLException { @@ -295,7 +290,7 @@ public class PTableImpl implements PTable { table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), - table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException { @@ -305,7 +300,7 @@ public class PTableImpl implements PTable { table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), - table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows) throws SQLException { @@ -315,7 +310,7 @@ public class PTableImpl implements PTable { table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), - table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, @@ -326,7 +321,7 @@ public class PTableImpl implements PTable { table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), - isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException { @@ -337,7 +332,7 @@ public class PTableImpl implements PTable { table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), - table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException { @@ -348,7 +343,7 @@ public class PTableImpl implements PTable { table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), - table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PTable table) throws SQLException { @@ -359,7 +354,7 @@ public class PTableImpl implements PTable { table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), - table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, @@ -368,7 +363,7 @@ public class PTableImpl implements PTable { boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, - long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException { + long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, @@ -383,7 +378,7 @@ public class PTableImpl implements PTable { boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped, - String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) + String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames, @@ -398,7 +393,7 @@ public class PTableImpl implements PTable { List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, - long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, + long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException { init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, @@ -438,7 +433,7 @@ public class PTableImpl implements PTable { List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, - boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, StorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, + boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException { Preconditions.checkNotNull(schemaName); Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty @@ -475,7 +470,7 @@ public class PTableImpl implements PTable { this.isNamespaceMapped = isNamespaceMapped; this.autoPartitionSeqName = autoPartitionSeqName; this.isAppendOnlySchema = isAppendOnlySchema; - this.storageScheme = storageScheme; + this.immutableStorageScheme = storageScheme; this.qualifierEncodingScheme = qualifierEncodingScheme; List<PColumn> pkColumns; PColumn[] allColumns; @@ -913,7 +908,7 @@ public class PTableImpl implements PTable { mutations.add(deleteRow); } else { // store all columns for a given column family in a single cell instead of one column per cell in order to improve write performance - if (storageScheme == StorageScheme.ONE_CELL_PER_COLUMN_FAMILY) { + if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { Put put = new Put(this.key); if (isWALDisabled()) { put.setDurability(Durability.SKIP_WAL); @@ -927,28 +922,19 @@ public class PTableImpl implements PTable { int qualifier = qualifierEncodingScheme.decode(column.getColumnQualifierBytes()); maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, qualifier); } - Expression[] colValues = new Expression[maxEncodedColumnQualifier+1]; - Arrays.fill(colValues, new DelegateExpression(LiteralExpression.newConstant(null)) { - @Override - public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { - return false; - } - }); - // 0 is a reserved position, set it to a non-null value so that we can represent absence of a value using a negative offset - colValues[0]=LiteralExpression.newConstant(QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier); for (PColumn column : columns) { if (columnToValueMap.containsKey(column)) { - int qualifier = qualifierEncodingScheme.decode(column.getColumnQualifierBytes()); - colValues[qualifier] = new LiteralExpression(columnToValueMap.get(column)); + int colIndex = qualifierEncodingScheme.decode(column.getColumnQualifierBytes())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1; + colValues[colIndex] = new LiteralExpression(columnToValueMap.get(column)); } } List<Expression> children = Arrays.asList(colValues); - // we use ArrayConstructorExpression to serialize multiple columns into a single byte[] - // construct the ArrayConstructorExpression with a variable length data type since columns can be of fixed or variable length - ArrayConstructorExpression arrayExpression = new ArrayConstructorExpression(children, PVarbinary.INSTANCE, rowKeyOrderOptimizable, PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION); + // we use SingleCellConstructorExpression to serialize all the columns into a single byte[] + SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - arrayExpression.evaluate(null, ptr); + singleCellConstructorExpression.evaluate(null, ptr); ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily); addQuietly(put, kvBuilder, kvBuilder.buildPut(keyPtr, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr)); @@ -1027,7 +1013,7 @@ public class PTableImpl implements PTable { removeIfPresent(unsetValues, family, qualifier); // store all columns for a given column family in a single cell instead of one column per cell in order to improve write performance // we don't need to do anything with unsetValues as it is only used when storeNulls is false, storeNulls is always true when storeColsInSingleCell is true - if (storageScheme == StorageScheme.ONE_CELL_PER_COLUMN_FAMILY) { + if (immutableStorageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { columnToValueMap.put(column, ptr.get()); } else { @@ -1326,9 +1312,9 @@ public class PTableImpl implements PTable { if (table.hasIsAppendOnlySchema()) { isAppendOnlySchema = table.getIsAppendOnlySchema(); } - StorageScheme storageScheme = null; + ImmutableStorageScheme storageScheme = null; if (table.hasStorageScheme()) { - storageScheme = StorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]); + storageScheme = ImmutableStorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]); } QualifierEncodingScheme qualifierEncodingScheme = null; if (table.hasEncodingScheme()) { @@ -1440,8 +1426,8 @@ public class PTableImpl implements PTable { builder.setAutoParititonSeqName(table.getAutoPartitionSeqName()); } builder.setIsAppendOnlySchema(table.isAppendOnlySchema()); - if (table.getStorageScheme() != null) { - builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getStorageScheme().getSerializedMetadataValue()})); + if (table.getImmutableStorageScheme() != null) { + builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()})); } if (table.getEncodedCQCounter() != null) { Map<String, Integer> values = table.getEncodedCQCounter().values(); @@ -1526,8 +1512,8 @@ public class PTableImpl implements PTable { } @Override - public StorageScheme getStorageScheme() { - return storageScheme; + public ImmutableStorageScheme getImmutableStorageScheme() { + return immutableStorageScheme; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java index 36df961..67ff1a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.util.SchemaUtil; public enum TableProperty { @@ -165,7 +166,27 @@ public enum TableProperty { return table.getEncodingScheme(); } - } + }, + + IMMUTABLE_STORAGE_SCHEME(PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) { + @Override + public ImmutableStorageScheme getValue(Object value) { + if (value == null) { + return null; + } else if (value instanceof String) { + String strValue = (String) value; + return ImmutableStorageScheme.valueOf(strValue.toUpperCase()); + } else { + throw new IllegalArgumentException("Immutable storage scheme table property must be a string"); + } + } + + @Override + public Object getPTableValue(PTable table) { + return table.getImmutableStorageScheme(); + } + + } ; private final String propertyName; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java index d3065a7..e99003f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java @@ -31,11 +31,12 @@ import java.util.NoSuchElementException; import javax.annotation.concurrent.NotThreadSafe; import org.apache.hadoop.hbase.Cell; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; /** * List implementation that provides indexed based look up when the cell column qualifiers are positive numbers. - * These qualifiers are generated by using one of the column qualifier encoding schemes specified in {@link QualifierEncodingScheme}. + * These qualifiers are generated by using one of the column qualifier encoding schemes specified in {@link ImmutableStorageScheme}. * The api methods in this list assume that the caller wants to see * and add only non null elements in the list. * <p> http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java index fede7d8..f31f272 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataType.java @@ -22,20 +22,15 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.sql.Types; import java.text.Format; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; import java.util.regex.Pattern; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.expression.Expression; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ConstraintViolationException; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.ValueSchema; -import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; @@ -354,130 +349,11 @@ public abstract class PArrayDataType<T> extends PDataType<T> { return createPhoenixArray(bytes, offset, length, sortOrder, baseType, maxLength, desiredDataType); } - public static boolean positionAtArrayElement(Tuple tuple, ImmutableBytesWritable ptr, int index, - Expression arrayExpr, PDataType pDataType, Integer maxLen) { - if (!arrayExpr.evaluate(tuple, ptr)) { - return false; - } else if (ptr.getLength() == 0) { return true; } - - // Given a ptr to the entire array, set ptr to point to a particular element within that array - // given the type of an array element (see comments in PDataTypeForArray) - return positionAtArrayElement(ptr, index - 1, pDataType, maxLen); - } - - public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType, - Integer byteSize) { - byte[] bytes = ptr.get(); - int initPos = ptr.getOffset(); - if (!baseDataType.isFixedWidth()) { - byte serializationVersion = bytes[ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_BYTE]; - int noOfElements = Bytes.toInt(bytes, - (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT); - boolean useShort = true; - if (noOfElements < 0) { - noOfElements = -noOfElements; - useShort = false; - } - if (arrayIndex >= noOfElements) { - ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); - return false; - } - - int indexOffset = Bytes.toInt(bytes, - (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT))) + ptr.getOffset(); - if (arrayIndex >= noOfElements) { - ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); - return false; - } else { - // Skip those many offsets as given in the arrayIndex - // If suppose there are 5 elements in the array and the arrayIndex = 3 - // This means we need to read the 4th element of the array - // So inorder to know the length of the 4th element we will read the offset of 4th element and the - // offset of 5th element. - // Subtracting the offset of 5th element and 4th element will give the length of 4th element - // So we could just skip reading the other elements. - int currOffset = getSerializedOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion); - if (currOffset<0) { - ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); - return false; - } - int elementLength = 0; - if (arrayIndex == (noOfElements - 1)) { - int separatorBytes = serializationVersion == SORTABLE_SERIALIZATION_VERSION ? 3 : 0; - elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset - - (currOffset + initPos) - separatorBytes; - } else { - int separatorByte = serializationVersion == SORTABLE_SERIALIZATION_VERSION ? 1 : 0; - elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes, - arrayIndex + 1, useShort, indexOffset, serializationVersion) - currOffset - separatorByte; - } - ptr.set(bytes, currOffset + initPos, elementLength); - } - } else { - int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize); - int offset = arrayIndex * elemByteSize; - if (offset >= ptr.getLength()) { - ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); - } else { - ptr.set(bytes, ptr.getOffset() + offset, elemByteSize); - } - } - return true; - } - - public static void positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType, - Integer byteSize, int offset, int length, int noOfElements, boolean first) { - byte[] bytes = ptr.get(); - if (!baseDataType.isFixedWidth()) { - byte serializationVersion = bytes[ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_BYTE]; - int indexOffset = Bytes.toInt(bytes, (offset + length - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT))) - + offset; - boolean useShort = true; - if (first) { - int count = Bytes.toInt(bytes, - (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT); - if (count < 0) { - count = -count; - useShort = false; - } - } - if (arrayIndex >= noOfElements) { - return; - } else { - // Skip those many offsets as given in the arrayIndex - // If suppose there are 5 elements in the array and the arrayIndex = 3 - // This means we need to read the 4th element of the array - // So inorder to know the length of the 4th element we will read the offset of 4th element and the - // offset of 5th element. - // Subtracting the offset of 5th element and 4th element will give the length of 4th element - // So we could just skip reading the other elements. - int currOffset = getOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion); - int elementLength = 0; - if (arrayIndex == (noOfElements - 1)) { - elementLength = (bytes[currOffset + offset] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset - - (currOffset + offset) - 3; - } else { - elementLength = (bytes[currOffset + offset] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + offset] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : getOffset(bytes, - arrayIndex + 1, useShort, indexOffset, serializationVersion) - currOffset - 1; - } - ptr.set(bytes, currOffset + offset, elementLength); - } - } else { - int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize); - offset += arrayIndex * elemByteSize; - if (offset >= offset + length) { - return; - } else { - ptr.set(bytes, offset, elemByteSize); - } - } - } - - private static int getOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) { + static int getOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) { return Math.abs(getSerializedOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion)); } - private static int getSerializedOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) { + static int getSerializedOffset(byte[] bytes, int arrayIndex, boolean useShort, int indexOffset, byte serializationVersion) { int offset; if (useShort) { offset = indexOffset + (Bytes.SIZEOF_SHORT * arrayIndex); @@ -514,13 +390,13 @@ public abstract class PArrayDataType<T> extends PDataType<T> { */ private byte[] createArrayBytes(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream, PhoenixArray array, int noOfElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) { - PArrayDataTypeBytesArrayBuilder builder = - new PArrayDataTypeBytesArrayBuilder(byteStream, oStream, noOfElements, baseType, sortOrder, rowKeyOrderOptimizable); + PArrayDataTypeEncoder builder = + new PArrayDataTypeEncoder(byteStream, oStream, noOfElements, baseType, sortOrder, rowKeyOrderOptimizable); for (int i = 0; i < noOfElements; i++) { byte[] bytes = array.toBytes(i); - builder.appendElem(bytes); + builder.appendValue(bytes); } - return builder.getBytesAndClose(); + return builder.encode(); } public static boolean appendItemToArray(ImmutableBytesWritable ptr, int length, int offset, byte[] arrayBytes, @@ -1211,140 +1087,4 @@ public abstract class PArrayDataType<T> extends PDataType<T> { buf.append(']'); return buf.toString(); } - - static public class PArrayDataTypeBytesArrayBuilder { - static private final int BYTE_ARRAY_DEFAULT_SIZE = 128; - - private PDataType baseType; - private SortOrder sortOrder; - private List<Integer> offsetPos; - private TrustedByteArrayOutputStream byteStream; - private DataOutputStream oStream; - private int nulls; - private byte serializationVersion; - private boolean rowKeyOrderOptimizable; - - public PArrayDataTypeBytesArrayBuilder(PDataType baseType, SortOrder sortOrder) { - this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), new LinkedList<Integer>(), baseType, sortOrder, true); - } - - public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream, - int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) { - this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion); - } - - public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream, - int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) { - this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION); - } - - public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream byteStream, - List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) { - this(byteStream, new DataOutputStream(byteStream), offsetPos, baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION); - } - - public PArrayDataTypeBytesArrayBuilder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream, - List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) { - this.baseType = baseType; - this.sortOrder = sortOrder; - this.offsetPos = offsetPos; - this.byteStream = byteStream; - this.oStream = oStream; - this.nulls = 0; - this.serializationVersion = serializationVersion; - this.rowKeyOrderOptimizable = rowKeyOrderOptimizable; - } - - private void close() { - try { - if (byteStream != null) byteStream.close(); - if (oStream != null) oStream.close(); - byteStream = null; - oStream = null; - } catch (IOException ioe) {} - } - - // used to represent the absence of a value - public void appendMissingElement() { - if (serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) { - offsetPos.add(-byteStream.size()); - nulls++; - } - } - - public boolean appendElem(byte[] bytes) { - return appendElem(bytes, 0, bytes.length); - } - - public boolean appendElem(byte[] bytes, int offset, int len) { - if (oStream == null || byteStream == null) return false; - try { - // track the offset position here from the size of the byteStream - if (!baseType.isFixedWidth()) { - // Any variable length array would follow the below order - // Every element would be seperated by a seperator byte '0' - // Null elements are counted and once a first non null element appears we - // write the count of the nulls prefixed with a seperator byte - // Trailing nulls are not taken into account - // The last non null element is followed by two seperator bytes - // For eg - // a, b, null, null, c, null would be - // 65 0 66 0 0 2 67 0 0 0 - // a null null null b c null d would be - // 65 0 0 3 66 0 67 0 0 1 68 0 0 0 - if (len == 0) { - offsetPos.add(byteStream.size()); - nulls++; - } else { - nulls = serializeNulls(oStream, nulls); - offsetPos.add(byteStream.size()); - if (sortOrder == SortOrder.DESC) { - SortOrder.invert(bytes, offset, bytes, offset, len); - offset = 0; - } - oStream.write(bytes, offset, len); - if (serializationVersion == SORTABLE_SERIALIZATION_VERSION) { - oStream.write(getSeparatorByte(rowKeyOrderOptimizable, sortOrder)); - } - } - } else { - // No nulls for fixed length - if (sortOrder == SortOrder.DESC) { - SortOrder.invert(bytes, offset, bytes, offset, len); - offset = 0; - } - oStream.write(bytes, offset, len); - } - return true; - } catch (IOException e) {} - return false; - } - - public byte[] getBytesAndClose() { - try { - if (!baseType.isFixedWidth()) { - int noOfElements = offsetPos.size(); - int[] offsetPosArray = new int[noOfElements]; - int index = 0; - for (Integer i : offsetPos) { - offsetPosArray[index] = i; - ++index; - } - if (serializationVersion == SORTABLE_SERIALIZATION_VERSION) { - // Double seperator byte to show end of the non null array - writeEndSeperatorForVarLengthArray(oStream, sortOrder, rowKeyOrderOptimizable); - } - noOfElements = PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements, - offsetPosArray[offsetPosArray.length - 1], offsetPosArray, serializationVersion); - serializeHeaderInfoIntoStream(oStream, noOfElements, serializationVersion); - } - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - ptr.set(byteStream.getBuffer(), 0, byteStream.size()); - return ByteUtil.copyKeyBytesIfNecessary(ptr); - } catch (IOException e) {} finally { - close(); - } - return null; - } - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java new file mode 100644 index 0000000..7a6ea91 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeDecoder.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.types; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.ColumnValueDecoder; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ByteUtil; + + +public class PArrayDataTypeDecoder implements ColumnValueDecoder { + + @Override + public boolean decode(ImmutableBytesWritable ptr, int index) { + return PArrayDataTypeDecoder.positionAtArrayElement(ptr, index, PVarbinary.INSTANCE, null); + } + + public static boolean positionAtArrayElement(Tuple tuple, ImmutableBytesWritable ptr, int index, + Expression arrayExpr, PDataType pDataType, Integer maxLen) { + if (!arrayExpr.evaluate(tuple, ptr)) { + return false; + } else if (ptr.getLength() == 0) { return true; } + + // Given a ptr to the entire array, set ptr to point to a particular element within that array + // given the type of an array element (see comments in PDataTypeForArray) + return positionAtArrayElement(ptr, index - 1, pDataType, maxLen); + } + + public static boolean positionAtArrayElement(ImmutableBytesWritable ptr, int arrayIndex, PDataType baseDataType, + Integer byteSize) { + byte[] bytes = ptr.get(); + int initPos = ptr.getOffset(); + if (!baseDataType.isFixedWidth()) { + byte serializationVersion = bytes[ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_BYTE]; + int noOfElements = Bytes.toInt(bytes, + (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT)), Bytes.SIZEOF_INT); + boolean useShort = true; + if (noOfElements < 0) { + noOfElements = -noOfElements; + useShort = false; + } + if (arrayIndex >= noOfElements) { + ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); + return false; + } + + int indexOffset = Bytes.toInt(bytes, + (ptr.getOffset() + ptr.getLength() - (Bytes.SIZEOF_BYTE + 2 * Bytes.SIZEOF_INT))) + ptr.getOffset(); + // Skip those many offsets as given in the arrayIndex + // If suppose there are 5 elements in the array and the arrayIndex = 3 + // This means we need to read the 4th element of the array + // So inorder to know the length of the 4th element we will read the offset of 4th element and the + // offset of 5th element. + // Subtracting the offset of 5th element and 4th element will give the length of 4th element + // So we could just skip reading the other elements. + int currOffset = PArrayDataType.getSerializedOffset(bytes, arrayIndex, useShort, indexOffset, serializationVersion); + if (currOffset<0) { + ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); + return false; + } + int elementLength = 0; + if (arrayIndex == (noOfElements - 1)) { + int separatorBytes = serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 3 : 0; + elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : indexOffset + - (currOffset + initPos) - separatorBytes; + } else { + int separatorByte = serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION ? 1 : 0; + elementLength = (bytes[currOffset + initPos] == QueryConstants.SEPARATOR_BYTE || bytes[currOffset + initPos] == QueryConstants.DESC_SEPARATOR_BYTE) ? 0 : PArrayDataType.getOffset(bytes, + arrayIndex + 1, useShort, indexOffset, serializationVersion) - currOffset - separatorByte; + } + ptr.set(bytes, currOffset + initPos, elementLength); + } else { + int elemByteSize = (byteSize == null ? baseDataType.getByteSize() : byteSize); + int offset = arrayIndex * elemByteSize; + if (offset >= ptr.getLength()) { + ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); + } else { + ptr.set(bytes, ptr.getOffset() + offset, elemByteSize); + } + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f3f7323/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java new file mode 100644 index 0000000..bb293bb --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.types; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.schema.ColumnValueEncoder; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; + +public class PArrayDataTypeEncoder implements ColumnValueEncoder { + static private final int BYTE_ARRAY_DEFAULT_SIZE = 128; + + private PDataType baseType; + private SortOrder sortOrder; + private List<Integer> offsetPos; + private TrustedByteArrayOutputStream byteStream; + private DataOutputStream oStream; + private int nulls; + private byte serializationVersion; + private boolean rowKeyOrderOptimizable; + + public PArrayDataTypeEncoder(PDataType baseType, SortOrder sortOrder) { + this(new TrustedByteArrayOutputStream(BYTE_ARRAY_DEFAULT_SIZE), new LinkedList<Integer>(), baseType, sortOrder, true); + } + + public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream, + int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) { + this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, serializationVersion); + } + + public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream, + int numElements, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) { + this(byteStream, oStream, new ArrayList<Integer>(numElements), baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION); + } + + public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, + List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable) { + this(byteStream, new DataOutputStream(byteStream), offsetPos, baseType, sortOrder, rowKeyOrderOptimizable, PArrayDataType.SORTABLE_SERIALIZATION_VERSION); + } + + public PArrayDataTypeEncoder(TrustedByteArrayOutputStream byteStream, DataOutputStream oStream, + List<Integer> offsetPos, PDataType baseType, SortOrder sortOrder, boolean rowKeyOrderOptimizable, byte serializationVersion) { + this.baseType = baseType; + this.sortOrder = sortOrder; + this.offsetPos = offsetPos; + this.byteStream = byteStream; + this.oStream = oStream; + this.nulls = 0; + this.serializationVersion = serializationVersion; + this.rowKeyOrderOptimizable = rowKeyOrderOptimizable; + } + + private void close() { + try { + if (byteStream != null) byteStream.close(); + if (oStream != null) oStream.close(); + byteStream = null; + oStream = null; + } catch (IOException ioe) {} + } + + // used to represent the absence of a value + @Override + public void appendAbsentValue() { + if (serializationVersion == PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION && !baseType.isFixedWidth()) { + offsetPos.add(-byteStream.size()); + nulls++; + } + else { + throw new UnsupportedOperationException("Cannot represent an absent element"); + } + } + + public void appendValue(byte[] bytes) { + appendValue(bytes, 0, bytes.length); + } + + @Override + public void appendValue(byte[] bytes, int offset, int len) { + try { + // track the offset position here from the size of the byteStream + if (!baseType.isFixedWidth()) { + // Any variable length array would follow the below order + // Every element would be seperated by a seperator byte '0' + // Null elements are counted and once a first non null element appears we + // write the count of the nulls prefixed with a seperator byte + // Trailing nulls are not taken into account + // The last non null element is followed by two seperator bytes + // For eg + // a, b, null, null, c, null would be + // 65 0 66 0 0 2 67 0 0 0 + // a null null null b c null d would be + // 65 0 0 3 66 0 67 0 0 1 68 0 0 0 + if (len == 0) { + offsetPos.add(byteStream.size()); + nulls++; + } else { + nulls = PArrayDataType.serializeNulls(oStream, nulls); + offsetPos.add(byteStream.size()); + if (sortOrder == SortOrder.DESC) { + SortOrder.invert(bytes, offset, bytes, offset, len); + offset = 0; + } + oStream.write(bytes, offset, len); + if (serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION) { + oStream.write(PArrayDataType.getSeparatorByte(rowKeyOrderOptimizable, sortOrder)); + } + } + } else { + // No nulls for fixed length + if (sortOrder == SortOrder.DESC) { + SortOrder.invert(bytes, offset, bytes, offset, len); + offset = 0; + } + oStream.write(bytes, offset, len); + } + } catch (IOException e) {} + } + + @Override + public byte[] encode() { + try { + if (!baseType.isFixedWidth()) { + int noOfElements = offsetPos.size(); + int[] offsetPosArray = new int[noOfElements]; + int index = 0; + for (Integer i : offsetPos) { + offsetPosArray[index] = i; + ++index; + } + if (serializationVersion == PArrayDataType.SORTABLE_SERIALIZATION_VERSION) { + // Double seperator byte to show end of the non null array + PArrayDataType.writeEndSeperatorForVarLengthArray(oStream, sortOrder, rowKeyOrderOptimizable); + } + noOfElements = PArrayDataType.serializeOffsetArrayIntoStream(oStream, byteStream, noOfElements, + offsetPosArray[offsetPosArray.length - 1], offsetPosArray, serializationVersion); + PArrayDataType.serializeHeaderInfoIntoStream(oStream, noOfElements, serializationVersion); + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(byteStream.getBuffer(), 0, byteStream.size()); + return ByteUtil.copyKeyBytesIfNecessary(ptr); + } catch (IOException e) {} finally { + close(); + } + return null; + } + +} \ No newline at end of file
