virajjasani commented on a change in pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#discussion_r773012137
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -157,64 +307,70 @@ public static void addTransform(
}
- public static SystemTransformRecord getTransformRecord(
- String schema, String logicalTableName, String logicalParentName,
String tenantId, PhoenixConnection connection) throws SQLException {
- try (ResultSet resultSet = connection.prepareStatement("SELECT " +
- PhoenixDatabaseMetaData.TENANT_ID + ", " +
- PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
- PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
- PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
- PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
- PhoenixDatabaseMetaData.OLD_METADATA + " , " +
- PhoenixDatabaseMetaData.NEW_METADATA + " , " +
- PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
- " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + "
WHERE " +
- (Strings.isNullOrEmpty(tenantId) ? "" :
(PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
- (Strings.isNullOrEmpty(schema) ? "" :
(PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
- PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" +
logicalTableName + "'" +
- (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " +
PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
- ).executeQuery()) {
- if (resultSet.next()) {
- return
SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+ public static void completeTransform(Connection connection, Configuration
configuration) throws Exception{
+ // Will be called from Reducer
+ long timestmp= EnvironmentEdgeManager.currentTimeMillis();
+ String tenantId = configuration.get(MAPREDUCE_TENANT_ID, null);
+ String fullOldTableName =
PhoenixConfigurationUtil.getInputTableName(configuration);
+ String schemaName =
SchemaUtil.getSchemaNameFromFullName(fullOldTableName);
+ String oldTableLogicalName =
SchemaUtil.getTableNameFromFullName(fullOldTableName);
+ String indexTableName =
SchemaUtil.getTableNameFromFullName(PhoenixConfigurationUtil.getIndexToolIndexTableName(configuration));
+ String logicaTableName = oldTableLogicalName;
+ String logicalParentName = null;
+ if (PhoenixConfigurationUtil.getTransformingTableType(configuration)
== IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE)
+ if (!Strings.isNullOrEmpty(indexTableName)) {
+ logicaTableName = indexTableName;
+ logicalParentName = SchemaUtil.getTableName(schemaName,
oldTableLogicalName);
}
- return null;
- }
- }
-
- public static boolean checkIsTransformNeeded(MetaDataClient.MetaProperties
metaProperties, String schemaName,
- PTable table, String
logicalTableName, String parentTableName,
- String tenantId,
PhoenixConnection connection) throws SQLException {
- boolean isTransformNeeded = isTransformNeeded(metaProperties, table);
- if (isTransformNeeded) {
- SystemTransformRecord existingTransform =
Transform.getTransformRecord(schemaName, logicalTableName, parentTableName,
tenantId,connection);
- if (existingTransform != null && existingTransform.isActive()) {
- throw new SQLExceptionInfo.Builder(
-
SQLExceptionCode.CANNOT_TRANSFORM_ALREADY_TRANSFORMING_TABLE)
- .setMessage(" Only one transform at a time is allowed
")
-
.setSchemaName(schemaName).setTableName(logicalTableName).build().buildException();
+ boolean isPartial =
PhoenixConfigurationUtil.getIsPartialTransform(configuration);
+ SystemTransformRecord transformRecord = getTransformRecord(schemaName,
logicaTableName, logicalParentName,
+ tenantId, connection.unwrap(PhoenixConnection.class));
+ if (!isPartial) {
+ String newTableName =
SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+ PTable pNewTable = PhoenixRuntime.getTable(connection,
transformRecord.getNewPhysicalTableName());
+ PTable pOldTable = PhoenixRuntime.getTable(connection,
SchemaUtil.getTableName(schemaName,logicaTableName));
+ if (pOldTable.getImmutableStorageScheme() !=
pNewTable.getImmutableStorageScheme() ||
+ pOldTable.getEncodingScheme() !=
pNewTable.getEncodingScheme()) {
+ MetaDataClient.mutateTransformProperties(connection, tenantId,
schemaName, logicaTableName, newTableName,
+ pNewTable.getImmutableStorageScheme(),
pNewTable.getEncodingScheme());
+ // We need to update the columns's qualifiers as well
+ if (pOldTable.getEncodingScheme() !=
pNewTable.getEncodingScheme()) {
+ Short nextKeySeq = 0;
Review comment:
nit: primitive `short`?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
##########
@@ -0,0 +1,762 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexDBWritable;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
+import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
+import static org.apache.phoenix.util.QueryUtil.getConnection;
+
+public class TransformTool extends Configured implements Tool {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TransformTool.class);
+
+ public static enum MR_COUNTER_METRICS {
Review comment:
nit: static is redundant
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+ private boolean isMultiTenant;
+ // indexed expressions that are not present in the row key of the data
table, the expression can also refer to a regular column
+ private List<Expression> newTableExpressions;
+ private Set<ColumnReference> newTableColumns;
+
+ private List<PDataType> newTableColumnTypes;
+ private int newTableColumnCount;
+ private byte[] newTableName;
+ private int nNewTableSaltBuckets;
+ private byte[] oldTableEmptyKeyValueCF;
+ private ImmutableBytesPtr emptyKeyValueCFPtr;
+ private int nOldTableCFs;
+ private boolean newTableWALDisabled;
+ private boolean newTableImmutableRows;
+ // Transient state
+ private final boolean isOldTableSalted;
+ private final RowKeySchema oldTableRowKeySchema;
+
+ private int estimatedNewTableRowKeyBytes;
+ private ColumnReference newTableEmptyKeyValueRef;
+ private ColumnReference oldTableEmptyKeyValueRef;
+ private boolean newTableRowKeyOrderOptimizable;
+
+ private PTable.QualifierEncodingScheme newTableEncodingScheme;
+ private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+ private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+ private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+ /*
+ * The first part of the pair is column family name
+ * and second part is the column name. The reason we need to track this
state is because for certain storage schemes
+ * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column
for which we need to generate an new
+ * table put/delete is different from the old columns in the phoenix
schema.
+ */
+ private Set<Pair<String, String>> newTableColumnsInfo;
+ /*
+ * Map of covered columns where a key is column reference for a column in
the data table
+ * and value is column reference for corresponding column in the new table.
+ */
+ private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+ private String logicalNewTableName;
+
+ public static TransformMaintainer create(PTable oldTable, PTable newTable,
PhoenixConnection connection) {
+ if (oldTable.getType() == PTableType.INDEX) {
+ throw new IllegalArgumentException();
+ }
+ TransformMaintainer maintainer = new TransformMaintainer(oldTable,
newTable, connection);
+ return maintainer;
+ }
+
+ private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean
isOldTableSalted) {
+ super(oldRowKeySchema, isOldTableSalted);
+ this.oldTableRowKeySchema = oldRowKeySchema;
+ this.isOldTableSalted = isOldTableSalted;
+ }
+
+ private TransformMaintainer(final PTable oldTable, final PTable newTable,
PhoenixConnection connection) {
+ this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+ this.newTableRowKeyOrderOptimizable =
newTable.rowKeyOrderOptimizable();
+ this.isMultiTenant = oldTable.isMultiTenant();
+
+ this.newTableEncodingScheme = newTable.getEncodingScheme() == null ?
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
newTable.getEncodingScheme();
+ this.newTableImmutableStorageScheme =
newTable.getImmutableStorageScheme() == null ?
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
newTable.getImmutableStorageScheme();
+ this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ?
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
oldTable.getEncodingScheme();
+ this.oldTableImmutableStorageScheme =
oldTable.getImmutableStorageScheme() == null ?
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
oldTable.getImmutableStorageScheme();
+
+ this.newTableName = newTable.getPhysicalName().getBytes();
+ boolean newTableWALDisabled = newTable.isWALDisabled();
+ int nNewTableColumns = newTable.getColumns().size();
+ int nNewTablePKColumns = newTable.getPKColumns().size();
+
+ List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+ this.newTableColumnCount = oldTablePKColumns.size();
+
+ this.newTableColumnTypes =
Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+ this.newTableExpressions =
Lists.newArrayListWithExpectedSize(nNewTableColumns);
+ this.coveredColumnsMap =
Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+ this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 :
newTable.getBucketNum();
+ this.oldTableEmptyKeyValueCF =
SchemaUtil.getEmptyColumnFamily(oldTable);
+ this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+ this.nOldTableCFs = oldTable.getColumnFamilies().size();
+ this.newTableWALDisabled = newTableWALDisabled;
+ this.newTableImmutableRows = newTable.isImmutableRows();
+ this.newTableColumnsInfo =
Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+ for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+ PColumnFamily family = newTable.getColumnFamilies().get(i);
+ for (PColumn newColumn : family.getColumns()) {
+ PColumn oldColumn = getColumnOrNull(oldTable,
newColumn.getName().getString(), newColumn.getFamilyName().getString());
+ // This can happen during deletion where we don't need covered
columns
+ if (oldColumn != null) {
+ byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+ byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+ this.coveredColumnsMap.put(new
ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+ new
ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+ }
+ }
+ }
+ this.logicalNewTableName = newTable.getName().getString();
+ initCachedState();
+ }
+
+ public static PColumn getColumnOrNull(PTable table, String columnName,
String familyName) {
+ PColumnFamily family;
+ try {
+ family = table.getColumnFamily(familyName);
+ } catch (ColumnFamilyNotFoundException e) {
+ return null;
+ }
+ try {
+ return family.getPColumnForColumnName(columnName);
+ } catch (ColumnNotFoundException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Init calculated state reading/creating
+ */
+ private void initCachedState() {
+ byte[] newTableEmptyKvQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+ byte[] oldTableEmptyKvQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+ newTableEmptyKeyValueRef = new
ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+ oldTableEmptyKeyValueRef = new
ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);
Review comment:
Planning to use cached value of `oldTableEmptyKeyValueRef` in future?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -157,64 +307,70 @@ public static void addTransform(
}
- public static SystemTransformRecord getTransformRecord(
- String schema, String logicalTableName, String logicalParentName,
String tenantId, PhoenixConnection connection) throws SQLException {
- try (ResultSet resultSet = connection.prepareStatement("SELECT " +
- PhoenixDatabaseMetaData.TENANT_ID + ", " +
- PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
- PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + ", " +
- PhoenixDatabaseMetaData.NEW_PHYS_TABLE_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_TYPE + ", " +
- PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_STATUS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_JOB_ID + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_START_TS + ", " +
- PhoenixDatabaseMetaData.TRANSFORM_END_TS + ", " +
- PhoenixDatabaseMetaData.OLD_METADATA + " , " +
- PhoenixDatabaseMetaData.NEW_METADATA + " , " +
- PhoenixDatabaseMetaData.TRANSFORM_FUNCTION +
- " FROM " + PhoenixDatabaseMetaData.SYSTEM_TRANSFORM_NAME + "
WHERE " +
- (Strings.isNullOrEmpty(tenantId) ? "" :
(PhoenixDatabaseMetaData.TENANT_ID + " ='" + tenantId + "' AND ")) +
- (Strings.isNullOrEmpty(schema) ? "" :
(PhoenixDatabaseMetaData.TABLE_SCHEM + " ='" + schema + "' AND ")) +
- PhoenixDatabaseMetaData.LOGICAL_TABLE_NAME + " ='" +
logicalTableName + "'" +
- (Strings.isNullOrEmpty(logicalParentName) ? "": (" AND " +
PhoenixDatabaseMetaData.LOGICAL_PARENT_NAME + "='" + logicalParentName + "'" ))
- ).executeQuery()) {
- if (resultSet.next()) {
- return
SystemTransformRecord.SystemTransformBuilder.build(resultSet);
+ public static void completeTransform(Connection connection, Configuration
configuration) throws Exception{
+ // Will be called from Reducer
+ long timestmp= EnvironmentEdgeManager.currentTimeMillis();
Review comment:
nit: `:s/timestmp/timestamp/`
##########
File path: phoenix-core/src/main/protobuf/ServerCachingService.proto
##########
@@ -70,6 +70,31 @@ message IndexMaintainer {
optional int32 dataImmutableStorageScheme = 27;
}
+message TransformMaintainer {
+ required int32 saltBuckets = 1;
+ required bool isMultiTenant = 2;
+ repeated ColumnReference newTableColumns = 3;
+ repeated ColumnReference oldTableColRefForCoveredColumns = 4;
+ repeated ColumnReference newTableColRefForCoveredColumns = 5;
+ required bytes newTableName = 6;
+ required bool newTableRowKeyOrderOptimizable = 8;
Review comment:
`7` seems skipped?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+ private boolean isMultiTenant;
+ // indexed expressions that are not present in the row key of the data
table, the expression can also refer to a regular column
+ private List<Expression> newTableExpressions;
+ private Set<ColumnReference> newTableColumns;
+
+ private List<PDataType> newTableColumnTypes;
+ private int newTableColumnCount;
+ private byte[] newTableName;
+ private int nNewTableSaltBuckets;
+ private byte[] oldTableEmptyKeyValueCF;
+ private ImmutableBytesPtr emptyKeyValueCFPtr;
+ private int nOldTableCFs;
+ private boolean newTableWALDisabled;
+ private boolean newTableImmutableRows;
+ // Transient state
+ private final boolean isOldTableSalted;
+ private final RowKeySchema oldTableRowKeySchema;
+
+ private int estimatedNewTableRowKeyBytes;
+ private ColumnReference newTableEmptyKeyValueRef;
+ private ColumnReference oldTableEmptyKeyValueRef;
+ private boolean newTableRowKeyOrderOptimizable;
+
+ private PTable.QualifierEncodingScheme newTableEncodingScheme;
+ private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+ private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+ private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+ /*
+ * The first part of the pair is column family name
+ * and second part is the column name. The reason we need to track this
state is because for certain storage schemes
+ * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column
for which we need to generate an new
+ * table put/delete is different from the old columns in the phoenix
schema.
+ */
+ private Set<Pair<String, String>> newTableColumnsInfo;
+ /*
+ * Map of covered columns where a key is column reference for a column in
the data table
+ * and value is column reference for corresponding column in the new table.
+ */
+ private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+ private String logicalNewTableName;
+
+ public static TransformMaintainer create(PTable oldTable, PTable newTable,
PhoenixConnection connection) {
+ if (oldTable.getType() == PTableType.INDEX) {
+ throw new IllegalArgumentException();
+ }
+ TransformMaintainer maintainer = new TransformMaintainer(oldTable,
newTable, connection);
+ return maintainer;
+ }
+
+ private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean
isOldTableSalted) {
+ super(oldRowKeySchema, isOldTableSalted);
+ this.oldTableRowKeySchema = oldRowKeySchema;
+ this.isOldTableSalted = isOldTableSalted;
+ }
+
+ private TransformMaintainer(final PTable oldTable, final PTable newTable,
PhoenixConnection connection) {
+ this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+ this.newTableRowKeyOrderOptimizable =
newTable.rowKeyOrderOptimizable();
+ this.isMultiTenant = oldTable.isMultiTenant();
+
+ this.newTableEncodingScheme = newTable.getEncodingScheme() == null ?
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
newTable.getEncodingScheme();
+ this.newTableImmutableStorageScheme =
newTable.getImmutableStorageScheme() == null ?
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
newTable.getImmutableStorageScheme();
+ this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ?
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
oldTable.getEncodingScheme();
+ this.oldTableImmutableStorageScheme =
oldTable.getImmutableStorageScheme() == null ?
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
oldTable.getImmutableStorageScheme();
+
+ this.newTableName = newTable.getPhysicalName().getBytes();
+ boolean newTableWALDisabled = newTable.isWALDisabled();
+ int nNewTableColumns = newTable.getColumns().size();
+ int nNewTablePKColumns = newTable.getPKColumns().size();
+
+ List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+ this.newTableColumnCount = oldTablePKColumns.size();
+
+ this.newTableColumnTypes =
Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+ this.newTableExpressions =
Lists.newArrayListWithExpectedSize(nNewTableColumns);
Review comment:
nit: remove type arg from `newTableColumnTypes` as well (similar to
`newTableExpressions`)?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+ private boolean isMultiTenant;
+ // indexed expressions that are not present in the row key of the data
table, the expression can also refer to a regular column
+ private List<Expression> newTableExpressions;
+ private Set<ColumnReference> newTableColumns;
+
+ private List<PDataType> newTableColumnTypes;
+ private int newTableColumnCount;
+ private byte[] newTableName;
+ private int nNewTableSaltBuckets;
+ private byte[] oldTableEmptyKeyValueCF;
+ private ImmutableBytesPtr emptyKeyValueCFPtr;
+ private int nOldTableCFs;
+ private boolean newTableWALDisabled;
+ private boolean newTableImmutableRows;
+ // Transient state
+ private final boolean isOldTableSalted;
+ private final RowKeySchema oldTableRowKeySchema;
+
+ private int estimatedNewTableRowKeyBytes;
+ private ColumnReference newTableEmptyKeyValueRef;
+ private ColumnReference oldTableEmptyKeyValueRef;
+ private boolean newTableRowKeyOrderOptimizable;
+
+ private PTable.QualifierEncodingScheme newTableEncodingScheme;
+ private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+ private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+ private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+ /*
+ * The first part of the pair is column family name
+ * and second part is the column name. The reason we need to track this
state is because for certain storage schemes
+ * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column
for which we need to generate an new
+ * table put/delete is different from the old columns in the phoenix
schema.
+ */
+ private Set<Pair<String, String>> newTableColumnsInfo;
+ /*
+ * Map of covered columns where a key is column reference for a column in
the data table
+ * and value is column reference for corresponding column in the new table.
+ */
+ private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+ private String logicalNewTableName;
+
+ public static TransformMaintainer create(PTable oldTable, PTable newTable,
PhoenixConnection connection) {
+ if (oldTable.getType() == PTableType.INDEX) {
+ throw new IllegalArgumentException();
+ }
+ TransformMaintainer maintainer = new TransformMaintainer(oldTable,
newTable, connection);
+ return maintainer;
+ }
+
+ private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean
isOldTableSalted) {
+ super(oldRowKeySchema, isOldTableSalted);
+ this.oldTableRowKeySchema = oldRowKeySchema;
+ this.isOldTableSalted = isOldTableSalted;
+ }
+
+ private TransformMaintainer(final PTable oldTable, final PTable newTable,
PhoenixConnection connection) {
+ this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+ this.newTableRowKeyOrderOptimizable =
newTable.rowKeyOrderOptimizable();
+ this.isMultiTenant = oldTable.isMultiTenant();
+
+ this.newTableEncodingScheme = newTable.getEncodingScheme() == null ?
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
newTable.getEncodingScheme();
+ this.newTableImmutableStorageScheme =
newTable.getImmutableStorageScheme() == null ?
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
newTable.getImmutableStorageScheme();
+ this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ?
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
oldTable.getEncodingScheme();
+ this.oldTableImmutableStorageScheme =
oldTable.getImmutableStorageScheme() == null ?
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
oldTable.getImmutableStorageScheme();
+
+ this.newTableName = newTable.getPhysicalName().getBytes();
+ boolean newTableWALDisabled = newTable.isWALDisabled();
+ int nNewTableColumns = newTable.getColumns().size();
+ int nNewTablePKColumns = newTable.getPKColumns().size();
+
+ List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+ this.newTableColumnCount = oldTablePKColumns.size();
+
+ this.newTableColumnTypes =
Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+ this.newTableExpressions =
Lists.newArrayListWithExpectedSize(nNewTableColumns);
+ this.coveredColumnsMap =
Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+ this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 :
newTable.getBucketNum();
+ this.oldTableEmptyKeyValueCF =
SchemaUtil.getEmptyColumnFamily(oldTable);
+ this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+ this.nOldTableCFs = oldTable.getColumnFamilies().size();
+ this.newTableWALDisabled = newTableWALDisabled;
+ this.newTableImmutableRows = newTable.isImmutableRows();
+ this.newTableColumnsInfo =
Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+ for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+ PColumnFamily family = newTable.getColumnFamilies().get(i);
+ for (PColumn newColumn : family.getColumns()) {
+ PColumn oldColumn = getColumnOrNull(oldTable,
newColumn.getName().getString(), newColumn.getFamilyName().getString());
+ // This can happen during deletion where we don't need covered
columns
+ if (oldColumn != null) {
+ byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+ byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+ this.coveredColumnsMap.put(new
ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+ new
ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+ }
+ }
+ }
+ this.logicalNewTableName = newTable.getName().getString();
+ initCachedState();
+ }
+
+ public static PColumn getColumnOrNull(PTable table, String columnName,
String familyName) {
+ PColumnFamily family;
+ try {
+ family = table.getColumnFamily(familyName);
+ } catch (ColumnFamilyNotFoundException e) {
+ return null;
+ }
+ try {
+ return family.getPColumnForColumnName(columnName);
+ } catch (ColumnNotFoundException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Init calculated state reading/creating
+ */
+ private void initCachedState() {
+ byte[] newTableEmptyKvQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+ byte[] oldTableEmptyKvQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+ newTableEmptyKeyValueRef = new
ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+ oldTableEmptyKeyValueRef = new
ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);
+ this.newTableColumns =
Sets.newLinkedHashSetWithExpectedSize(this.newTableColumnCount);
+
+ for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+ if (newTableImmutableStorageScheme ==
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+ newTableColumns.add(colRef);
+ } else {
+ newTableColumns.add(new ColumnReference(colRef.getFamily(),
QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+ }
+ }
+ }
+
+ /**
+ * For client-side to serialize TransformMaintainer for a given table
+ *
+ * @param oldTable old table
+ * @param ptr bytes pointer to hold returned serialized value
+ * @param newTable new table to serialize
+ */
+ public static void serialize(PTable oldTable, ImmutableBytesWritable ptr,
+ PTable newTable, PhoenixConnection
connection) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(stream);
+ try {
+ // Encode data table salting
+ WritableUtils.writeVInt(output, oldTable.getBucketNum() == null ?
1 : -1);
+ // Write out data row key schema once, since it's the same
+ oldTable.getRowKeySchema().write(output);
+
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.TransformMaintainer
proto =
+
TransformMaintainer.toProto(newTable.getTransformMaintainer(oldTable,
connection));
+ byte[] protoBytes = proto.toByteArray();
+ WritableUtils.writeVInt(output, protoBytes.length);
+ output.write(protoBytes);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ ptr.set(stream.toByteArray(), 0, stream.size());
+ }
+
+ public static ServerCachingProtos.TransformMaintainer
toProto(TransformMaintainer maintainer) throws IOException {
+ ServerCachingProtos.TransformMaintainer.Builder builder =
ServerCachingProtos.TransformMaintainer.newBuilder();
+ builder.setSaltBuckets(maintainer.nNewTableSaltBuckets);
+ builder.setIsMultiTenant(maintainer.isMultiTenant);
+
+ for (ColumnReference colRef : maintainer.newTableColumns) {
+ ServerCachingProtos.ColumnReference.Builder cRefBuilder =
ServerCachingProtos.ColumnReference.newBuilder();
+ cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+ cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+ builder.addNewTableColumns(cRefBuilder.build());
+ }
+
+ for (Map.Entry<ColumnReference, ColumnReference> e :
maintainer.coveredColumnsMap.entrySet()) {
+ ServerCachingProtos.ColumnReference.Builder cRefBuilder =
ServerCachingProtos.ColumnReference.newBuilder();
+ ColumnReference dataTableColRef = e.getKey();
+
cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
+
cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
+ builder.addOldTableColRefForCoveredColumns(cRefBuilder.build());
+ if (maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS) {
+ // We need to serialize the colRefs of new tables only in case
of encoded column names.
+ ColumnReference newTableColRef = e.getValue();
+ cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+
cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
+
cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
+
builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
+ }
+ }
+
+ builder.setNewTableColumnCount(maintainer.newTableColumnCount);
+ builder.setNewTableName(ByteStringer.wrap(maintainer.newTableName));
+
builder.setNewTableRowKeyOrderOptimizable(maintainer.newTableRowKeyOrderOptimizable);
+
builder.setOldTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.oldTableEmptyKeyValueCF));
+ ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder =
ServerCachingProtos.ImmutableBytesWritable.newBuilder();
+
ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get()));
+ ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength());
+ ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset());
+ builder.setEmptyKeyValueColFamily(ibwBuilder.build());
+ try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+ DataOutput output = new DataOutputStream(stream);
+ for (Expression expression : maintainer.newTableExpressions) {
+ WritableUtils.writeVInt(output,
ExpressionType.valueOf(expression).ordinal());
+ expression.write(output);
+ }
+
builder.setNewTableExpressions(ByteStringer.wrap(stream.toByteArray()));
+ }
+
+ builder.setNumDataTableColFamilies(maintainer.nOldTableCFs);
+ builder.setNewTableWalDisabled(maintainer.newTableWALDisabled);
+
builder.setNewTableRowKeyByteSize(maintainer.estimatedNewTableRowKeyBytes);
+ builder.setNewTableImmutable(maintainer.newTableImmutableRows);
+ for (Pair<String, String> p : maintainer.newTableColumnsInfo) {
+ ServerCachingProtos.ColumnInfo.Builder ciBuilder =
ServerCachingProtos.ColumnInfo.newBuilder();
+ if (p.getFirst() != null) {
+ ciBuilder.setFamilyName(p.getFirst());
+ }
+ ciBuilder.setColumnName(p.getSecond());
+ builder.addNewTableColumnInfo(ciBuilder.build());
+ }
+
builder.setNewTableEncodingScheme(maintainer.newTableEncodingScheme.getSerializedMetadataValue());
+
builder.setNewTableImmutableStorageScheme(maintainer.newTableImmutableStorageScheme.getSerializedMetadataValue());
+ builder.setLogicalNewTableName(maintainer.logicalNewTableName);
+
builder.setOldTableEncodingScheme(maintainer.oldTableEncodingScheme.getSerializedMetadataValue());
+
builder.setOldTableImmutableStorageScheme(maintainer.oldTableImmutableStorageScheme.getSerializedMetadataValue());
+ return builder.build();
+ }
+
+ public static TransformMaintainer
fromProto(ServerCachingProtos.TransformMaintainer proto, RowKeySchema
dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
+ TransformMaintainer maintainer = new
TransformMaintainer(dataTableRowKeySchema, isDataTableSalted);
+ maintainer.nNewTableSaltBuckets = proto.getSaltBuckets();
+ maintainer.isMultiTenant = proto.getIsMultiTenant();
+ List<ServerCachingProtos.ColumnReference> newTableColList =
proto.getNewTableColumnsList();
+ maintainer.newTableColumns = new
HashSet<ColumnReference>(newTableColList.size());
+ for (ServerCachingProtos.ColumnReference colRefFromProto :
newTableColList) {
+ maintainer.newTableColumns.add(new
ColumnReference(colRefFromProto.getFamily().toByteArray(),
colRefFromProto.getQualifier().toByteArray()));
+ }
+
+ maintainer.newTableName = proto.getNewTableName().toByteArray();
+ if (proto.getNewTableColumnCount() != -1) {
+ maintainer.newTableColumnCount = proto.getNewTableColumnCount();
+ }
+
+ maintainer.newTableRowKeyOrderOptimizable =
proto.getNewTableRowKeyOrderOptimizable();
+ maintainer.oldTableEmptyKeyValueCF =
proto.getOldTableEmptyKeyValueColFamily().toByteArray();
+ ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily =
proto.getEmptyKeyValueColFamily();
+ maintainer.emptyKeyValueCFPtr = new
ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(),
emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength());
+
+ maintainer.nOldTableCFs = proto.getNumDataTableColFamilies();
+ maintainer.newTableWALDisabled = proto.getNewTableWalDisabled();
+ maintainer.estimatedNewTableRowKeyBytes =
proto.getNewTableRowKeyByteSize();
+ maintainer.newTableImmutableRows = proto.getNewTableImmutable();
+ List<ServerCachingProtos.ColumnInfo> newTblColumnInfoList =
proto.getNewTableColumnInfoList();
+ maintainer.newTableColumnsInfo = Sets.newHashSet();
+ for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) {
+ maintainer.newTableColumnsInfo.add(new
Pair<>(info.getFamilyName(), info.getColumnName()));
+ }
+ // proto doesn't support single byte so need an explicit cast here
+ maintainer.newTableEncodingScheme =
PTable.QualifierEncodingScheme.fromSerializedValue((byte)
proto.getNewTableEncodingScheme());
+ maintainer.newTableImmutableStorageScheme =
PTable.ImmutableStorageScheme.fromSerializedValue((byte)
proto.getNewTableImmutableStorageScheme());
+ maintainer.oldTableEncodingScheme =
PTable.QualifierEncodingScheme.fromSerializedValue((byte)
proto.getOldTableEncodingScheme());
+ maintainer.oldTableImmutableStorageScheme =
PTable.ImmutableStorageScheme.fromSerializedValue((byte)
proto.getOldTableImmutableStorageScheme());
+
+ List<ServerCachingProtos.ColumnReference>
oldTableColRefsForCoveredColumnsList =
proto.getOldTableColRefForCoveredColumnsList();
+ List<ServerCachingProtos.ColumnReference>
newTableColRefsForCoveredColumnsList =
proto.getNewTableColRefForCoveredColumnsList();
+ maintainer.coveredColumnsMap =
Maps.newHashMapWithExpectedSize(oldTableColRefsForCoveredColumnsList.size());
+ boolean encodedColumnNames = maintainer.newTableEncodingScheme !=
NON_ENCODED_QUALIFIERS;
+ Iterator<ServerCachingProtos.ColumnReference> newTableColRefItr =
newTableColRefsForCoveredColumnsList.iterator();
+ for (ServerCachingProtos.ColumnReference colRefFromProto :
oldTableColRefsForCoveredColumnsList) {
+ ColumnReference oldTableColRef = new
ColumnReference(colRefFromProto.getFamily().toByteArray(),
colRefFromProto.getQualifier().toByteArray());
+ ColumnReference newTableColRef;
+ if (encodedColumnNames) {
+ ServerCachingProtos.ColumnReference fromProto =
newTableColRefItr.next();
+ newTableColRef = new
ColumnReference(fromProto.getFamily().toByteArray(),
fromProto.getQualifier().toByteArray());
+ } else {
+ byte[] cq = oldTableColRef.getQualifier();
+ byte[] cf = oldTableColRef.getFamily();
+ newTableColRef = new ColumnReference(cf, cq);
+ }
+ maintainer.coveredColumnsMap.put(oldTableColRef, newTableColRef);
+ }
+ maintainer.logicalNewTableName = proto.getLogicalNewTableName();
+ maintainer.initCachedState();
+ return maintainer;
+ }
+
+
+ public static List<IndexMaintainer> deserialize(byte[] buf) {
+ return deserialize(buf, 0, buf.length);
+ }
+
+ private static List<IndexMaintainer> deserialize(byte[] buf, int offset,
int length) {
+ List<IndexMaintainer> maintainers = Collections.emptyList();
+ if (length > 0) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(buf,
offset, length);
+ DataInput input = new DataInputStream(stream);
+ try {
+ int size = WritableUtils.readVInt(input);
+ boolean isDataTableSalted = size < 0;
+ size = Math.abs(size);
+ RowKeySchema rowKeySchema = new RowKeySchema();
+ rowKeySchema.readFields(input);
+ maintainers = Lists.newArrayListWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ int protoSize = WritableUtils.readVInt(input);
+ byte[] b = new byte[protoSize];
+ input.readFully(b);
+ ServerCachingProtos.TransformMaintainer proto =
ServerCachingProtos.TransformMaintainer.parseFrom(b);
+ maintainers.add(TransformMaintainer.fromProto(proto,
rowKeySchema, isDataTableSalted));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
+ return maintainers;
+ }
+
+ // Return new table's name
+ public byte[] getIndexTableName() {
+ return newTableName;
+ }
+
+ // Builds new table's rowkey using the old table's rowkey.
+ // This method will change when we support rowkey related transforms
+ public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ boolean isNewTableSalted = nNewTableSaltBuckets > 0;
+ TrustedByteArrayOutputStream stream = new
TrustedByteArrayOutputStream(estimatedNewTableRowKeyBytes);
+ DataOutput output = new DataOutputStream(stream);
+
+ try {
+ if (isNewTableSalted) {
+ output.write(0); // will be set at end to new table salt byte
+ }
+ // The oldTableRowKeySchema includes the salt byte field,
+ // so we must adjust for that here.
+ int dataPosOffset = isOldTableSalted ? 1 : 0 ;
+ //BitSet viewConstantColumnBitSet =
this.rowKeyMetaData.getViewConstantColumnBitSet();
+ // Skip data table salt byte
+ int maxRowKeyOffset = rowKeyPtr.getOffset() +
rowKeyPtr.getLength();
+ oldTableRowKeySchema.iterator(rowKeyPtr, ptr, dataPosOffset);
+
+ // Write new table row key
+ while (oldTableRowKeySchema.next(ptr, dataPosOffset,
maxRowKeyOffset) != null) {
+ output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
+ if
(!oldTableRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
+
output.writeByte(SchemaUtil.getSeparatorByte(newTableRowKeyOrderOptimizable,
ptr.getLength()==0
+ , oldTableRowKeySchema.getField(dataPosOffset)));
+ }
+ dataPosOffset++;
+ }
+
+ byte[] newTableRowKey = stream.getBuffer();
+ // Remove trailing nulls
+ int length = stream.size();
+ if (isNewTableSalted) {
+ // Set salt byte
+ byte saltByte = SaltingUtil.getSaltingByte(newTableRowKey,
SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES,
nNewTableSaltBuckets);
+ newTableRowKey[0] = saltByte;
+ }
+ return newTableRowKey.length == length ? newTableRowKey :
Arrays.copyOf(newTableRowKey, length);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
Review comment:
Good to wrap `stream` in try-with-resource to avoid having to handle
such possibilities here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]