http://git-wip-us.apache.org/repos/asf/phoenix/blob/56c17679/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 9a7b9e3..93a87ea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -27,6 +27,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES; @@ -34,6 +35,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYT import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES; @@ -57,6 +59,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX; @@ -74,11 +77,11 @@ import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_ import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY; import static org.apache.phoenix.schema.PTableType.INDEX; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; +import static org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier; import static org.apache.phoenix.util.SchemaUtil.getVarCharLength; import static org.apache.phoenix.util.SchemaUtil.getVarChars; import java.io.IOException; -import java.sql.DriverManager; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; @@ -150,14 +153,12 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; -import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -190,8 +191,10 @@ import org.apache.phoenix.schema.PMetaDataEntity; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.LinkType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; @@ -209,10 +212,12 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PSmallint; +import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -282,6 +287,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES); private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES); private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES); + private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES); private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList( EMPTY_KEYVALUE_KV, @@ -308,7 +314,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso UPDATE_CACHE_FREQUENCY_KV, IS_NAMESPACE_MAPPED_KV, AUTO_PARTITION_SEQ_KV, - APPEND_ONLY_SCHEMA_KV + APPEND_ONLY_SCHEMA_KV, + STORAGE_SCHEME_KV ); static { Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR); @@ -338,6 +345,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV); private static final int AUTO_PARTITION_SEQ_INDEX = TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV); private static final int APPEND_ONLY_SCHEMA_INDEX = TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV); + private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV); // KeyValues for Column private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); @@ -351,6 +359,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES); private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES); private static final KeyValue IS_ROW_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES); + private static final KeyValue ENCODED_COLUMN_QUALIFIER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODED_COLUMN_QUALIFIER_BYTES); private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList( DECIMAL_DIGITS_KV, COLUMN_SIZE_KV, @@ -363,11 +372,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso VIEW_CONSTANT_KV, IS_VIEW_REFERENCED_KV, COLUMN_DEF_KV, - IS_ROW_TIMESTAMP_KV + IS_ROW_TIMESTAMP_KV, + ENCODED_COLUMN_QUALIFIER_KV ); static { Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR); } + private static final KeyValue QUALIFIER_COUNTER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_COUNTER_BYTES); private static final int DECIMAL_DIGITS_INDEX = COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV); private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV); private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV); @@ -379,9 +390,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV); private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV); private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV); + private static final int ENCODED_COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(ENCODED_COLUMN_QUALIFIER_KV); private static final int LINK_TYPE_INDEX = 0; - + private static final KeyValue CLASS_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES); private static final KeyValue JAR_PATH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES); private static final KeyValue RETURN_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES); @@ -717,8 +729,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject( isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(), isRowTimestampKV.getValueLength())); - - PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false); + Cell columnQualifierKV = colKeyValues[ENCODED_COLUMN_QUALIFIER_INDEX]; + Integer columnQualifier = + columnQualifierKV == null ? null : getEncodedColumnQualifier( + columnQualifierKV.getValueArray(), columnQualifierKV.getValueOffset(), + columnQualifierKV.getValueLength()); + PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifier); columns.add(column); } @@ -926,37 +942,49 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(), isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength())); - - + Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX]; + //TODO: change this once we start having other values for storage schemes + StorageScheme storageScheme = storageSchemeKv == null ? StorageScheme.NON_ENCODED_COLUMN_NAMES : StorageScheme + .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(), + storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength())); + List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount); List<PTable> indexes = Lists.newArrayList(); List<PName> physicalTables = Lists.newArrayList(); PName parentTableName = tableType == INDEX ? dataTableName : null; PName parentSchemaName = tableType == INDEX ? schemaName : null; + EncodedCQCounter cqCounter = (storageScheme == StorageScheme.NON_ENCODED_COLUMN_NAMES || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER : new EncodedCQCounter(); while (true) { - results.clear(); - scanner.next(results); - if (results.isEmpty()) { - break; - } - Cell colKv = results.get(LINK_TYPE_INDEX); - int colKeyLength = colKv.getRowLength(); - PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset); - int colKeyOffset = offset + colName.getBytes().length + 1; - PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset); - if (colName.getString().isEmpty() && famName != null) { - LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); - if (linkType == LinkType.INDEX_TABLE) { - addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes); - } else if (linkType == LinkType.PHYSICAL_TABLE) { - physicalTables.add(famName); - } else if (linkType == LinkType.PARENT_TABLE) { - parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes())); - parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes())); - } - } else { - addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null); - } + results.clear(); + scanner.next(results); + if (results.isEmpty()) { + break; + } + Cell colKv = results.get(LINK_TYPE_INDEX); + if (colKv != null) { + int colKeyLength = colKv.getRowLength(); + PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset); + int colKeyOffset = offset + colName.getBytes().length + 1; + PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset); + if (isQualifierCounterKV(colKv)) { + Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC); + cqCounter.setValue(famName.getString(), value); + } else { + if (colName.getString().isEmpty() && famName != null) { + LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); + if (linkType == LinkType.INDEX_TABLE) { + addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes); + } else if (linkType == LinkType.PHYSICAL_TABLE) { + physicalTables.add(famName); + } else if (linkType == LinkType.PARENT_TABLE) { + parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes())); + parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes())); + } + } else { + addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null); + } + } + } } // Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote // server while holding this lock is a bad idea and likely to cause contention. @@ -964,9 +992,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount, - indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema); + indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, cqCounter); } - + + private boolean isQualifierCounterKV(Cell kv) { + int cmp = + Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength(), QUALIFIER_COUNTER_KV.getQualifierArray(), + QUALIFIER_COUNTER_KV.getQualifierOffset(), QUALIFIER_COUNTER_KV.getQualifierLength()); + return cmp == 0; + } + private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException { List<Cell> results = Lists.newArrayList(); scanner.next(results); @@ -1486,46 +1522,46 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso Short indexId = null; if (request.hasAllocateIndexId() && request.getAllocateIndexId()) { String tenantIdStr = tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes); - try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)){ - PName physicalName = parentTable.getPhysicalName(); - int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets(); - SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, + try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) { + PName physicalName = parentTable.getPhysicalName(); + int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets(); + SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, nSequenceSaltBuckets, parentTable.isNamespaceMapped() ); // TODO Review Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and incremented at the client max(SCN,dataTable.getTimestamp), but it seems we should // use always LATEST_TIMESTAMP to avoid seeing wrong sequence values by different connection having SCN // or not. - long sequenceTimestamp = HConstants.LATEST_TIMESTAMP; - try { - connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(), + long sequenceTimestamp = HConstants.LATEST_TIMESTAMP; + try { + connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(), Short.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false, sequenceTimestamp); - } catch (SequenceAlreadyExistsException e) { - } - long[] seqValues = new long[1]; - SQLException[] sqlExceptions = new SQLException[1]; - connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)), + } catch (SequenceAlreadyExistsException e) { + } + long[] seqValues = new long[1]; + SQLException[] sqlExceptions = new SQLException[1]; + connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)), HConstants.LATEST_TIMESTAMP, seqValues, sqlExceptions); - if (sqlExceptions[0] != null) { - throw sqlExceptions[0]; - } - long seqValue = seqValues[0]; - if (seqValue > Short.MAX_VALUE) { - builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES); - builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); - done.run(builder.build()); - return; - } - Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata); - NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap(); - List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES); - Cell cell = cells.get(0); - PDataType dataType = MetaDataUtil.getViewIndexIdDataType(); - Object val = dataType.toObject(seqValue, PLong.INSTANCE); - byte[] bytes = new byte [dataType.getByteSize() + 1]; - dataType.toBytes(val, bytes, 0); - Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES, + if (sqlExceptions[0] != null) { + throw sqlExceptions[0]; + } + long seqValue = seqValues[0]; + if (seqValue > Short.MAX_VALUE) { + builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + return; + } + Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata); + NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap(); + List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES); + Cell cell = cells.get(0); + PDataType dataType = MetaDataUtil.getViewIndexIdDataType(); + Object val = dataType.toObject(seqValue, PLong.INSTANCE); + byte[] bytes = new byte [dataType.getByteSize() + 1]; + dataType.toBytes(val, bytes, 0); + Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES, cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes); - cells.add(indexIdCell); - indexId = (short) seqValue; + cells.add(indexIdCell); + indexId = (short) seqValue; } } @@ -1536,7 +1572,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // indexing on the system table. This is an issue because of the way we manage batch mutation // in the Indexer. region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); - + // Invalidate the cache - the next getTable call will add it // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); @@ -1988,7 +2024,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return result; } region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); - // Invalidate from cache + // Invalidate from cache. for (ImmutableBytesPtr invalidateKey : invalidateList) { metaDataCache.invalidate(invalidateKey); } @@ -2161,6 +2197,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[][] rkmd = new byte[5][]; int pkCount = getVarChars(m.getRow(), rkmd); if (pkCount > COLUMN_NAME_INDEX + && rkmd[COLUMN_NAME_INDEX] != null && rkmd[COLUMN_NAME_INDEX].length > 0 && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0 && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) { columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES))); @@ -2195,8 +2232,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]); String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]); try { - existingViewColumn = columnFamily == null ? view.getColumn(columnName) : view.getColumnFamily( - columnFamily).getColumn(columnName); + existingViewColumn = columnFamily == null ? view.getPColumnForColumnName(columnName) : view.getColumnFamily( + columnFamily).getPColumnForColumnName(columnName); } catch (ColumnFamilyNotFoundException e) { // ignore since it means that the column family is not present for the column to be added. } catch (ColumnNotFoundException e) { @@ -2323,26 +2360,26 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso columnsAddedToBaseTable++; } } - /* - * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base - * table pk columns 2. if we are adding all the existing view pk columns to the base table - */ - if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) { - return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); - } - addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view, - deltaNumPkColsSoFar); - - /* - * Increment the sequence number by 1 if: - * 1) For a diverged view, there were columns (pk columns) added to the view. - * 2) For a non-diverged view if the base column count changed. - */ - boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0) - || (!isDivergedView(view) && columnsAddedToBaseTable > 0); - updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews, - invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable, - viewKey, view, ordinalPositionList, numCols, changeSequenceNumber); + /* + * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base + * table pk columns 2. if we are adding all the existing view pk columns to the base table + */ + if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) { + return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); + } + addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view, + deltaNumPkColsSoFar); + + /* + * Increment the sequence number by 1 if: + * 1) For a diverged view, there were columns (pk columns) added to the view. + * 2) For a non-diverged view if the base column count changed. + */ + boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0) + || (!isDivergedView(view) && columnsAddedToBaseTable > 0); + updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews, + invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable, + viewKey, view, ordinalPositionList, numCols, changeSequenceNumber); } return null; } @@ -2500,8 +2537,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily); try { existingViewColumn = - columnFamily == null ? view.getColumn(columnName) : view - .getColumnFamily(columnFamily).getColumn(columnName); + columnFamily == null ? view.getPColumnForColumnName(columnName) : view + .getColumnFamily(columnFamily).getPColumnForColumnName(columnName); } catch (ColumnFamilyNotFoundException e) { // ignore since it means that the column family is not present for the column to // be added. @@ -2567,7 +2604,25 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) { if (existingViewColumn != null) { - + if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) && !SchemaUtil.isPKColumn(existingViewColumn)) { + /* + * If the column already exists in a view, then we cannot add the column to the base + * table. The reason is subtle and is as follows: consider the case where a table + * has two views where both the views have the same key value column KV. Now, we + * dole out encoded column qualifiers for key value columns in views by using the + * counters stored in the base physical table. So the KV column can have different + * column qualifiers for the two views. For example, 11 for VIEW1 and 12 for VIEW2. + * This naturally extends to rows being inserted using the two views having + * different column qualifiers for the column named KV. Now, when an attempt is made + * to add column KV to the base table, we cannot decide which column qualifier + * should that column be assigned. It cannot be a number different than 11 or 12 + * since a query like SELECT KV FROM BASETABLE would return null for KV which is + * incorrect since column KV is present in rows inserted from the two views. We + * cannot use 11 or 12 either because we will then incorrectly return value of KV + * column inserted using only one view. + */ + return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); + } // Validate data type is same int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES); if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) { @@ -2797,6 +2852,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return mutationResult; } } + } else if (type == PTableType.VIEW + && EncodedColumnsUtil.usesEncodedColumnNames(table)) { + /* + * When adding a column to a view that uses encoded column name scheme, we + * need to modify the CQ counters stored in the view's physical table. So to + * make sure clients get the latest PTable, we need to invalidate the cache + * entry. + */ + invalidateList.add(new ImmutableBytesPtr(MetaDataUtil + .getPhysicalTableRowForView(table))); } for (Mutation m : tableMetaData) { byte[] key = m.getRow(); @@ -2810,7 +2875,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) { PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]); - family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); + family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) { addingPKColumn = true; @@ -3063,7 +3128,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]); columnToDelete = - family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); + family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) { deletePKColumn = true; @@ -3152,10 +3217,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] indexKey = SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index .getTableName().getBytes()); + Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(), columnToDelete.getName().getString()); + boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo); + boolean isCoveredColumn = indexMaintainer.getCoveredColumnInfo().contains(columnToDeleteInfo); // If index requires this column for its pk, then drop it - if (indexMaintainer.getIndexedColumns().contains( - new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete - .getName().getBytes()))) { + if (isColumnIndexed) { // Since we're dropping the index, lock it to ensure // that a change in index state doesn't // occur while we're dropping it. @@ -3176,9 +3242,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso invalidateList.add(new ImmutableBytesPtr(indexKey)); } // If the dropped column is a covered index column, invalidate the index - else if (indexMaintainer.getCoveredColumns().contains( - new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete - .getName().getBytes()))) { + else if (isCoveredColumn){ invalidateList.add(new ImmutableBytesPtr(indexKey)); } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/56c17679/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 3cfe790..8a833ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -107,7 +107,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } } - public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) { + private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) { byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN); if (topN == null) { return null; @@ -125,7 +125,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { orderByExpression.readFields(input); orderByExpressions.add(orderByExpression); } - ResultIterator inner = new RegionScannerResultIterator(s); + ResultIterator inner = new RegionScannerResultIterator(s, ScanUtil.getMinMaxQualifiersFromScan(scan)); return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize); } catch (IOException e) { @@ -218,21 +218,24 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + //TODO: samarth make this a client side check by looking at order by and group by expressions. Then use that to set min max qualifiers. We can then make useQualifierListAsIndex + // a member variable of BaseScannerRegionObserver. + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan)) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null; innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, tx, - viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); + viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr, useQualifierAsIndex); final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan); if (j != null) { - innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment()); + innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment(), useQualifierAsIndex); } if (scanOffset != null) { innerScanner = getOffsetScanner(c, innerScanner, - new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset), + new OffsetResultIterator(new RegionScannerResultIterator(innerScanner, ScanUtil.getMinMaxQualifiersFromScan(scan)), scanOffset), scan.getAttribute(QueryConstants.LAST_SCAN) != null); } - final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner); + final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner); if (iterator == null) { return innerScanner; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/56c17679/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 0d0f0c2..a313dd1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; +import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -96,7 +97,10 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; import org.apache.phoenix.schema.stats.StatisticsCollectorFactory; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PDataType; @@ -305,6 +309,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] deleteCQ = null; byte[] deleteCF = null; byte[] emptyCF = null; + byte[] emptyKVQualifier = null; ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (upsertSelectTable != null) { isUpsert = true; @@ -320,12 +325,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ); } emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF); + emptyKVQualifier = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER); } TupleProjector tupleProjector = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan)) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null; if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -334,11 +341,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); theScanner = getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, - c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); } if (j != null) { - theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env); + theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex); } int batchSize = 0; @@ -374,7 +381,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Aggregator[] rowAggregators = aggregators.getAggregators(); boolean hasMore; boolean hasAny = false; - MultiKeyValueTuple result = new MultiKeyValueTuple(); + Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); } @@ -392,7 +400,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver acquiredLock = true; synchronized (innerScanner) { do { - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned
