gokceni commented on a change in pull request #1366: URL: https://github.com/apache/phoenix/pull/1366#discussion_r773376594
########## File path: phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java ########## @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.transform; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.coprocessor.generated.ServerCachingProtos; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.ExpressionType; + +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; +import org.apache.phoenix.schema.ColumnNotFoundException; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.SaltingUtil; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + +public class TransformMaintainer extends IndexMaintainer { + private boolean isMultiTenant; + // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column + private List<Expression> newTableExpressions; + private Set<ColumnReference> newTableColumns; + + private List<PDataType> newTableColumnTypes; + private int newTableColumnCount; + private byte[] newTableName; + private int nNewTableSaltBuckets; + private byte[] oldTableEmptyKeyValueCF; + private ImmutableBytesPtr emptyKeyValueCFPtr; + private int nOldTableCFs; + private boolean newTableWALDisabled; + private boolean newTableImmutableRows; + // Transient state + private final boolean isOldTableSalted; + private final RowKeySchema oldTableRowKeySchema; + + private int estimatedNewTableRowKeyBytes; + private ColumnReference newTableEmptyKeyValueRef; + private ColumnReference oldTableEmptyKeyValueRef; + private boolean newTableRowKeyOrderOptimizable; + + private PTable.QualifierEncodingScheme newTableEncodingScheme; + private PTable.ImmutableStorageScheme newTableImmutableStorageScheme; + private PTable.QualifierEncodingScheme oldTableEncodingScheme; + private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme; + /* + * The first part of the pair is column family name + * and second part is the column name. The reason we need to track this state is because for certain storage schemes + * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an new + * table put/delete is different from the old columns in the phoenix schema. + */ + private Set<Pair<String, String>> newTableColumnsInfo; + /* + * Map of covered columns where a key is column reference for a column in the data table + * and value is column reference for corresponding column in the new table. + */ + private Map<ColumnReference, ColumnReference> coveredColumnsMap; + + private String logicalNewTableName; + + public static TransformMaintainer create(PTable oldTable, PTable newTable, PhoenixConnection connection) { + if (oldTable.getType() == PTableType.INDEX) { + throw new IllegalArgumentException(); + } + TransformMaintainer maintainer = new TransformMaintainer(oldTable, newTable, connection); + return maintainer; + } + + private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean isOldTableSalted) { + super(oldRowKeySchema, isOldTableSalted); + this.oldTableRowKeySchema = oldRowKeySchema; + this.isOldTableSalted = isOldTableSalted; + } + + private TransformMaintainer(final PTable oldTable, final PTable newTable, PhoenixConnection connection) { + this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null); + this.newTableRowKeyOrderOptimizable = newTable.rowKeyOrderOptimizable(); + this.isMultiTenant = oldTable.isMultiTenant(); + + this.newTableEncodingScheme = newTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : newTable.getEncodingScheme(); + this.newTableImmutableStorageScheme = newTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : newTable.getImmutableStorageScheme(); + this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ? PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : oldTable.getEncodingScheme(); + this.oldTableImmutableStorageScheme = oldTable.getImmutableStorageScheme() == null ? PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : oldTable.getImmutableStorageScheme(); + + this.newTableName = newTable.getPhysicalName().getBytes(); + boolean newTableWALDisabled = newTable.isWALDisabled(); + int nNewTableColumns = newTable.getColumns().size(); + int nNewTablePKColumns = newTable.getPKColumns().size(); + + List<PColumn> oldTablePKColumns = oldTable.getPKColumns(); + + this.newTableColumnCount = oldTablePKColumns.size(); + + this.newTableColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns); + this.newTableExpressions = Lists.newArrayListWithExpectedSize(nNewTableColumns); + this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns); + this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 : newTable.getBucketNum(); + this.oldTableEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(oldTable); + this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable); + this.nOldTableCFs = oldTable.getColumnFamilies().size(); + this.newTableWALDisabled = newTableWALDisabled; + this.newTableImmutableRows = newTable.isImmutableRows(); + this.newTableColumnsInfo = Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns); + + for (int i = 0; i < newTable.getColumnFamilies().size(); i++) { + PColumnFamily family = newTable.getColumnFamilies().get(i); + for (PColumn newColumn : family.getColumns()) { + PColumn oldColumn = getColumnOrNull(oldTable, newColumn.getName().getString(), newColumn.getFamilyName().getString()); + // This can happen during deletion where we don't need covered columns + if (oldColumn != null) { + byte[] oldColumnCq = oldColumn.getColumnQualifierBytes(); + byte[] newColumnCq = newColumn.getColumnQualifierBytes(); + this.coveredColumnsMap.put(new ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq), + new ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq)); + } + } + } + this.logicalNewTableName = newTable.getName().getString(); + initCachedState(); + } + + public static PColumn getColumnOrNull(PTable table, String columnName, String familyName) { + PColumnFamily family; + try { + family = table.getColumnFamily(familyName); + } catch (ColumnFamilyNotFoundException e) { + return null; + } + try { + return family.getPColumnForColumnName(columnName); + } catch (ColumnNotFoundException e) { + return null; + } + } + + /** + * Init calculated state reading/creating + */ + private void initCachedState() { + byte[] newTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst(); + byte[] oldTableEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst(); + newTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier); + oldTableEmptyKeyValueRef = new ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier); Review comment: Yes. We might need it with the new transform types. If you prefer, I can remove now and add when we need it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
