gjacoby126 commented on a change in pull request #1366:
URL: https://github.com/apache/phoenix/pull/1366#discussion_r770051369
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
##########
@@ -456,7 +456,7 @@ public static void dumpTable(String tableName) throws
Exception {
hTable =
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName.getBytes());
Scan scan = new Scan();
scan.setRaw(true);
- LOGGER.info("***** Table Name : " + tableName);
+ System.out.println("***** Table Name : " + tableName);
Review comment:
Please remove printlns
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameBaseIT.java
##########
@@ -529,15 +530,14 @@ protected void validateIndex(Connection connection,
String tableName, boolean is
public static void renameAndDropPhysicalTable(Connection conn, String
tenantId, String schema, String tableName, String physicalName, boolean
isNamespaceEnabled) throws Exception {
String
- changeName =
- String.format(
- "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM,
TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, '%s',
'%s', NULL, NULL, '%s')",
- tenantId, schema, tableName, physicalName);
+ changeName = String.format(
+ "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM,
TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, PHYSICAL_TABLE_NAME) VALUES (%s, %s,
'%s', NULL, NULL, '%s')",
+ tenantId, schema==null ? null : ("'" + schema + "'"),
tableName, physicalName);
conn.createStatement().execute(changeName);
conn.commit();
String fullTableName = SchemaUtil.getTableName(schema, tableName);
- if (isNamespaceEnabled) {
+ if (isNamespaceEnabled && !(Strings.isNullOrEmpty(schema) ||
"NULL".equals(schema))) {
Review comment:
"NULL" should probably be a constant
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformReducer.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.schema.transform.Transform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Reducer class that does only one task and that is to update the index state
of the table.
Review comment:
Comment seems incorrect (probably from copy/paste)
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
##########
@@ -465,7 +465,7 @@ public static void dumpTable(String tableName) throws
Exception {
.entrySet()) {
byte[] family = entryF.getKey();
}
- LOGGER.info(cellString + " ****** value : " +
Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+ System.out.println(cellString + " ****** value : " +
Bytes.toStringBinary(CellUtil.cloneValue(cell)));
Review comment:
Ditto
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -71,14 +87,150 @@ public static void addTransform(PhoenixConnection
connection, String tenantId, P
newPhysicalTableName = generateNewTableName(schema,
logicalTableName, sequenceNum);
}
transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
- Transform.addTransform(transformBuilder.build(), connection);
+ Transform.addTransform(table, changingProperties,
transformBuilder.build(), connection);
} catch (JsonProcessingException ex) {
LOGGER.error("addTransform failed", ex);
throw new SQLException("Adding transform failed with
JsonProcessingException");
+ } catch (SQLException ex) {
+ throw ex;
+ } catch(Exception ex) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.valueOf("CANNOT_MUTATE_TABLE"))
+ .setSchemaName((table.getSchemaName() == null? null:
table.getSchemaName().getString()))
+ .setRootCause(ex)
+
.setTableName(table.getName().getString()).build().buildException();
}
}
- public static void addTransform(
+ protected static void addTransform(
+ PTable table, MetaDataClient.MetaProperties changedProps,
SystemTransformRecord systemTransformParams, PhoenixConnection connection)
throws Exception {
+ PName newTableName =
PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
+ PName newTableNameWithoutSchema =
PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
+ PTable newTable = new PTableImpl.Builder()
+ .setTableName(newTableNameWithoutSchema)
+ .setParentTableName(table.getParentTableName())
+ .setBaseTableLogicalName(table.getBaseTableLogicalName())
+ .setPhysicalTableName(newTableNameWithoutSchema)
+ .setAllColumns(table.getColumns())
+ .setAppendOnlySchema(table.isAppendOnlySchema())
+ .setAutoPartitionSeqName(table.getAutoPartitionSeqName())
+ .setBaseColumnCount(table.getBaseColumnCount())
+ .setBucketNum(table.getBucketNum())
+ .setDefaultFamilyName(table.getDefaultFamilyName())
+ .setDisableWAL(table.isWALDisabled())
+ .setEstimatedSize(table.getEstimatedSize())
+ .setFamilies(table.getColumnFamilies())
+ .setImmutableRows(table.isImmutableRows())
+ .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+ .setIndexType(table.getIndexType())
+ .setName(newTableName)
+ .setMultiTenant(table.isMultiTenant())
+ .setParentName(table.getParentName())
+ .setParentSchemaName(table.getParentSchemaName())
+ .setPhoenixTTL(table.getPhoenixTTL())
+ .setNamespaceMapped(table.isNamespaceMapped())
+ .setSchemaName(table.getSchemaName())
+ .setPkColumns(table.getPKColumns())
+ .setPkName(table.getPKName())
+ .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+ .setRowKeySchema(table.getRowKeySchema())
+ .setStoreNulls(table.getStoreNulls())
+ .setTenantId(table.getTenantId())
+ .setType(table.getType())
+ // SchemaExtractor uses physical name to get the table
descriptor from. So we use the existing table here
+
.setPhysicalNames(ImmutableList.copyOf(table.getPhysicalNames()))
+ .setUpdateCacheFrequency(table.getUpdateCacheFrequency())
+ .setTransactionProvider(table.getTransactionProvider())
+
.setUseStatsForParallelization(table.useStatsForParallelization())
+ // TODO SET SCHEMAVERSION
Review comment:
Need to set schema version and change detection enabled now that they're
checked in, right?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -71,14 +87,150 @@ public static void addTransform(PhoenixConnection
connection, String tenantId, P
newPhysicalTableName = generateNewTableName(schema,
logicalTableName, sequenceNum);
}
transformBuilder.setNewPhysicalTableName(newPhysicalTableName);
- Transform.addTransform(transformBuilder.build(), connection);
+ Transform.addTransform(table, changingProperties,
transformBuilder.build(), connection);
} catch (JsonProcessingException ex) {
LOGGER.error("addTransform failed", ex);
throw new SQLException("Adding transform failed with
JsonProcessingException");
+ } catch (SQLException ex) {
+ throw ex;
+ } catch(Exception ex) {
+ throw new
SQLExceptionInfo.Builder(SQLExceptionCode.valueOf("CANNOT_MUTATE_TABLE"))
+ .setSchemaName((table.getSchemaName() == null? null:
table.getSchemaName().getString()))
+ .setRootCause(ex)
+
.setTableName(table.getName().getString()).build().buildException();
}
}
- public static void addTransform(
+ protected static void addTransform(
+ PTable table, MetaDataClient.MetaProperties changedProps,
SystemTransformRecord systemTransformParams, PhoenixConnection connection)
throws Exception {
+ PName newTableName =
PNameFactory.newName(systemTransformParams.getNewPhysicalTableName());
+ PName newTableNameWithoutSchema =
PNameFactory.newName(SchemaUtil.getTableNameFromFullName(systemTransformParams.getNewPhysicalTableName()));
+ PTable newTable = new PTableImpl.Builder()
+ .setTableName(newTableNameWithoutSchema)
+ .setParentTableName(table.getParentTableName())
+ .setBaseTableLogicalName(table.getBaseTableLogicalName())
+ .setPhysicalTableName(newTableNameWithoutSchema)
+ .setAllColumns(table.getColumns())
+ .setAppendOnlySchema(table.isAppendOnlySchema())
+ .setAutoPartitionSeqName(table.getAutoPartitionSeqName())
+ .setBaseColumnCount(table.getBaseColumnCount())
+ .setBucketNum(table.getBucketNum())
+ .setDefaultFamilyName(table.getDefaultFamilyName())
+ .setDisableWAL(table.isWALDisabled())
+ .setEstimatedSize(table.getEstimatedSize())
+ .setFamilies(table.getColumnFamilies())
+ .setImmutableRows(table.isImmutableRows())
+ .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+ .setIndexType(table.getIndexType())
+ .setName(newTableName)
+ .setMultiTenant(table.isMultiTenant())
+ .setParentName(table.getParentName())
+ .setParentSchemaName(table.getParentSchemaName())
+ .setPhoenixTTL(table.getPhoenixTTL())
+ .setNamespaceMapped(table.isNamespaceMapped())
+ .setSchemaName(table.getSchemaName())
+ .setPkColumns(table.getPKColumns())
+ .setPkName(table.getPKName())
+ .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+ .setRowKeySchema(table.getRowKeySchema())
+ .setStoreNulls(table.getStoreNulls())
+ .setTenantId(table.getTenantId())
+ .setType(table.getType())
+ // SchemaExtractor uses physical name to get the table
descriptor from. So we use the existing table here
+
.setPhysicalNames(ImmutableList.copyOf(table.getPhysicalNames()))
+ .setUpdateCacheFrequency(table.getUpdateCacheFrequency())
+ .setTransactionProvider(table.getTransactionProvider())
+
.setUseStatsForParallelization(table.useStatsForParallelization())
+ // TODO SET SCHEMAVERSION
+ // Transformables
+ .setImmutableStorageScheme(
+ (changedProps.getImmutableStorageSchemeProp() != null?
changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme()))
+ .setQualifierEncodingScheme(
+ (changedProps.getColumnEncodedBytesProp() != null?
changedProps.getColumnEncodedBytesProp() : table.getEncodingScheme()))
+ .build();
Review comment:
One thing to handle in a future JIRA is how this will co-exist with
schema registry -- when we transform we probably need to create a new schema
entry in the external schema registry because change detection will need to
know the column encoding to be able to parse WAL edits.
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.transform;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+public class TransformMaintainer extends IndexMaintainer {
+ private boolean isMultiTenant;
+ // indexed expressions that are not present in the row key of the data
table, the expression can also refer to a regular column
+ private List<Expression> newTableExpressions;
+ private Set<ColumnReference> newTableColumns;
+
+ private List<PDataType> newTableColumnTypes;
+ private int newTableColumnCount;
+ private byte[] newTableName;
+ private int nNewTableSaltBuckets;
+ private byte[] oldTableEmptyKeyValueCF;
+ private ImmutableBytesPtr emptyKeyValueCFPtr;
+ private int nOldTableCFs;
+ private boolean newTableWALDisabled;
+ private boolean newTableImmutableRows;
+ // Transient state
+ private final boolean isOldTableSalted;
+ private final RowKeySchema oldTableRowKeySchema;
+
+ private int estimatedNewTableRowKeyBytes;
+ private ColumnReference newTableEmptyKeyValueRef;
+ private ColumnReference oldTableEmptyKeyValueRef;
+ private boolean newTableRowKeyOrderOptimizable;
+
+ private PTable.QualifierEncodingScheme newTableEncodingScheme;
+ private PTable.ImmutableStorageScheme newTableImmutableStorageScheme;
+ private PTable.QualifierEncodingScheme oldTableEncodingScheme;
+ private PTable.ImmutableStorageScheme oldTableImmutableStorageScheme;
+ /*
+ * The first part of the pair is column family name
+ * and second part is the column name. The reason we need to track this
state is because for certain storage schemes
+ * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column
for which we need to generate an new
+ * table put/delete is different from the old columns in the phoenix
schema.
+ */
+ private Set<Pair<String, String>> newTableColumnsInfo;
+ /*
+ * Map of covered columns where a key is column reference for a column in
the data table
+ * and value is column reference for corresponding column in the new table.
+ */
+ private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+
+ private String logicalNewTableName;
+
+ public static TransformMaintainer create(PTable oldTable, PTable newTable,
PhoenixConnection connection) {
+ if (oldTable.getType() == PTableType.INDEX) {
+ throw new IllegalArgumentException();
+ }
+ TransformMaintainer maintainer = new TransformMaintainer(oldTable,
newTable, connection);
+ return maintainer;
+ }
+
+ private TransformMaintainer(RowKeySchema oldRowKeySchema, boolean
isOldTableSalted) {
+ super(oldRowKeySchema, isOldTableSalted);
+ this.oldTableRowKeySchema = oldRowKeySchema;
+ this.isOldTableSalted = isOldTableSalted;
+ }
+
+ private TransformMaintainer(final PTable oldTable, final PTable newTable,
PhoenixConnection connection) {
+ this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
+ this.newTableRowKeyOrderOptimizable =
newTable.rowKeyOrderOptimizable();
+ this.isMultiTenant = oldTable.isMultiTenant();
+
+ this.newTableEncodingScheme = newTable.getEncodingScheme() == null ?
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
newTable.getEncodingScheme();
+ this.newTableImmutableStorageScheme =
newTable.getImmutableStorageScheme() == null ?
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
newTable.getImmutableStorageScheme();
+ this.oldTableEncodingScheme = oldTable.getEncodingScheme() == null ?
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
oldTable.getEncodingScheme();
+ this.oldTableImmutableStorageScheme =
oldTable.getImmutableStorageScheme() == null ?
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
oldTable.getImmutableStorageScheme();
+
+ this.newTableName = newTable.getPhysicalName().getBytes();
+ boolean newTableWALDisabled = newTable.isWALDisabled();
+ int nNewTableColumns = newTable.getColumns().size();
+ int nNewTablePKColumns = newTable.getPKColumns().size();
+
+ List<PColumn> oldTablePKColumns = oldTable.getPKColumns();
+
+ this.newTableColumnCount = oldTablePKColumns.size();
+
+ this.newTableColumnTypes =
Lists.<PDataType>newArrayListWithExpectedSize(nNewTablePKColumns);
+ this.newTableExpressions =
Lists.newArrayListWithExpectedSize(nNewTableColumns);
+ this.coveredColumnsMap =
Maps.newHashMapWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+ this.nNewTableSaltBuckets = newTable.getBucketNum() == null ? 0 :
newTable.getBucketNum();
+ this.oldTableEmptyKeyValueCF =
SchemaUtil.getEmptyColumnFamily(oldTable);
+ this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(newTable);
+ this.nOldTableCFs = oldTable.getColumnFamilies().size();
+ this.newTableWALDisabled = newTableWALDisabled;
+ this.newTableImmutableRows = newTable.isImmutableRows();
+ this.newTableColumnsInfo =
Sets.newHashSetWithExpectedSize(nNewTableColumns - nNewTablePKColumns);
+
+ for (int i = 0; i < newTable.getColumnFamilies().size(); i++) {
+ PColumnFamily family = newTable.getColumnFamilies().get(i);
+ for (PColumn newColumn : family.getColumns()) {
+ PColumn oldColumn = getColumnOrNull(oldTable,
newColumn.getName().getString(), newColumn.getFamilyName().getString());
+ // This can happen during deletion where we don't need covered
columns
+ if (oldColumn != null) {
+ byte[] oldColumnCq = oldColumn.getColumnQualifierBytes();
+ byte[] newColumnCq = newColumn.getColumnQualifierBytes();
+ this.coveredColumnsMap.put(new
ColumnReference(oldColumn.getFamilyName().getBytes(), oldColumnCq),
+ new
ColumnReference(newColumn.getFamilyName().getBytes(), newColumnCq));
+ }
+ }
+ }
+ this.logicalNewTableName = newTable.getName().getString();
+ initCachedState();
+ }
+
+ public static PColumn getColumnOrNull(PTable table, String columnName,
String familyName) {
+ PColumnFamily family;
+ try {
+ family = table.getColumnFamily(familyName);
+ } catch (ColumnFamilyNotFoundException e) {
+ return null;
+ }
+ try {
+ return family.getPColumnForColumnName(columnName);
+ } catch (ColumnNotFoundException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Init calculated state reading/creating
+ */
+ private void initCachedState() {
+ byte[] newTableEmptyKvQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
+ byte[] oldTableEmptyKvQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
+ newTableEmptyKeyValueRef = new
ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);
+ oldTableEmptyKeyValueRef = new
ColumnReference(oldTableEmptyKeyValueCF, oldTableEmptyKvQualifier);
+ this.newTableColumns =
Sets.newLinkedHashSetWithExpectedSize(this.newTableColumnCount);
+
+ for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+ if (newTableImmutableStorageScheme ==
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+ newTableColumns.add(colRef);
+ } else {
+ newTableColumns.add(new ColumnReference(colRef.getFamily(),
QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+ }
+ }
+ }
+
+ /**
+ * For client-side to serialize TransformMaintainer for a given table
+ *
+ * @param oldTable old table
+ * @param ptr bytes pointer to hold returned serialized value
+ * @param newTable new table to serialize
+ */
+ public static void serialize(PTable oldTable, ImmutableBytesWritable ptr,
+ PTable newTable, PhoenixConnection
connection) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(stream);
+ try {
+ // Encode data table salting
+ WritableUtils.writeVInt(output, oldTable.getBucketNum() == null ?
1 : -1);
+ // Write out data row key schema once, since it's the same
+ oldTable.getRowKeySchema().write(output);
+
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.TransformMaintainer
proto =
+
TransformMaintainer.toProto(newTable.getTransformMaintainer(oldTable,
connection));
+ byte[] protoBytes = proto.toByteArray();
+ WritableUtils.writeVInt(output, protoBytes.length);
+ output.write(protoBytes);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ ptr.set(stream.toByteArray(), 0, stream.size());
+ }
+
+ public static ServerCachingProtos.TransformMaintainer
toProto(TransformMaintainer maintainer) throws IOException {
+ ServerCachingProtos.TransformMaintainer.Builder builder =
ServerCachingProtos.TransformMaintainer.newBuilder();
+ builder.setSaltBuckets(maintainer.nNewTableSaltBuckets);
+ builder.setIsMultiTenant(maintainer.isMultiTenant);
+
+ for (ColumnReference colRef : maintainer.newTableColumns) {
+ ServerCachingProtos.ColumnReference.Builder cRefBuilder =
ServerCachingProtos.ColumnReference.newBuilder();
+ cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+ cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+ builder.addNewTableColumns(cRefBuilder.build());
+ }
+
+ for (Map.Entry<ColumnReference, ColumnReference> e :
maintainer.coveredColumnsMap.entrySet()) {
+ ServerCachingProtos.ColumnReference.Builder cRefBuilder =
ServerCachingProtos.ColumnReference.newBuilder();
+ ColumnReference dataTableColRef = e.getKey();
+
cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
+
cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
+ builder.addOldTableColRefForCoveredColumns(cRefBuilder.build());
+ if (maintainer.newTableEncodingScheme != NON_ENCODED_QUALIFIERS) {
+ // We need to serialize the colRefs of new tables only in case
of encoded column names.
+ ColumnReference newTableColRef = e.getValue();
+ cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder();
+
cRefBuilder.setFamily(ByteStringer.wrap(newTableColRef.getFamily()));
+
cRefBuilder.setQualifier(ByteStringer.wrap(newTableColRef.getQualifier()));
+
builder.addNewTableColRefForCoveredColumns(cRefBuilder.build());
+ }
+ }
+
+ builder.setNewTableColumnCount(maintainer.newTableColumnCount);
+ builder.setNewTableName(ByteStringer.wrap(maintainer.newTableName));
+
builder.setNewTableRowKeyOrderOptimizable(maintainer.newTableRowKeyOrderOptimizable);
+
builder.setOldTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.oldTableEmptyKeyValueCF));
+ ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder =
ServerCachingProtos.ImmutableBytesWritable.newBuilder();
+
ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get()));
+ ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength());
+ ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset());
+ builder.setEmptyKeyValueColFamily(ibwBuilder.build());
+ try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+ DataOutput output = new DataOutputStream(stream);
+ for (Expression expression : maintainer.newTableExpressions) {
+ WritableUtils.writeVInt(output,
ExpressionType.valueOf(expression).ordinal());
+ expression.write(output);
+ }
+
builder.setNewTableExpressions(ByteStringer.wrap(stream.toByteArray()));
+ }
+
+ builder.setNumDataTableColFamilies(maintainer.nOldTableCFs);
+ builder.setNewTableWalDisabled(maintainer.newTableWALDisabled);
+
builder.setNewTableRowKeyByteSize(maintainer.estimatedNewTableRowKeyBytes);
+ builder.setNewTableImmutable(maintainer.newTableImmutableRows);
+ for (Pair<String, String> p : maintainer.newTableColumnsInfo) {
+ ServerCachingProtos.ColumnInfo.Builder ciBuilder =
ServerCachingProtos.ColumnInfo.newBuilder();
+ if (p.getFirst() != null) {
+ ciBuilder.setFamilyName(p.getFirst());
+ }
+ ciBuilder.setColumnName(p.getSecond());
+ builder.addNewTableColumnInfo(ciBuilder.build());
+ }
+
builder.setNewTableEncodingScheme(maintainer.newTableEncodingScheme.getSerializedMetadataValue());
+
builder.setNewTableImmutableStorageScheme(maintainer.newTableImmutableStorageScheme.getSerializedMetadataValue());
+ builder.setLogicalNewTableName(maintainer.logicalNewTableName);
+
builder.setOldTableEncodingScheme(maintainer.oldTableEncodingScheme.getSerializedMetadataValue());
+
builder.setOldTableImmutableStorageScheme(maintainer.oldTableImmutableStorageScheme.getSerializedMetadataValue());
+ return builder.build();
+ }
+
+ public static TransformMaintainer
fromProto(ServerCachingProtos.TransformMaintainer proto, RowKeySchema
dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
+ TransformMaintainer maintainer = new
TransformMaintainer(dataTableRowKeySchema, isDataTableSalted);
+ maintainer.nNewTableSaltBuckets = proto.getSaltBuckets();
+ maintainer.isMultiTenant = proto.getIsMultiTenant();
+ List<ServerCachingProtos.ColumnReference> newTableColList =
proto.getNewTableColumnsList();
+ maintainer.newTableColumns = new
HashSet<ColumnReference>(newTableColList.size());
+ for (ServerCachingProtos.ColumnReference colRefFromProto :
newTableColList) {
+ maintainer.newTableColumns.add(new
ColumnReference(colRefFromProto.getFamily().toByteArray(),
colRefFromProto.getQualifier().toByteArray()));
+ }
+
+ maintainer.newTableName = proto.getNewTableName().toByteArray();
+ if (proto.getNewTableColumnCount() != -1) {
+ maintainer.newTableColumnCount = proto.getNewTableColumnCount();
+ }
+
+ maintainer.newTableRowKeyOrderOptimizable =
proto.getNewTableRowKeyOrderOptimizable();
+ maintainer.oldTableEmptyKeyValueCF =
proto.getOldTableEmptyKeyValueColFamily().toByteArray();
+ ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily =
proto.getEmptyKeyValueColFamily();
+ maintainer.emptyKeyValueCFPtr = new
ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(),
emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength());
+
+ maintainer.nOldTableCFs = proto.getNumDataTableColFamilies();
+ maintainer.newTableWALDisabled = proto.getNewTableWalDisabled();
+ maintainer.estimatedNewTableRowKeyBytes =
proto.getNewTableRowKeyByteSize();
+ maintainer.newTableImmutableRows = proto.getNewTableImmutable();
+ List<ServerCachingProtos.ColumnInfo> newTblColumnInfoList =
proto.getNewTableColumnInfoList();
+ maintainer.newTableColumnsInfo = Sets.newHashSet();
+ for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) {
+ maintainer.newTableColumnsInfo.add(new
Pair<>(info.getFamilyName(), info.getColumnName()));
+ }
+ // proto doesn't support single byte so need an explicit cast here
+ maintainer.newTableEncodingScheme =
PTable.QualifierEncodingScheme.fromSerializedValue((byte)
proto.getNewTableEncodingScheme());
+ maintainer.newTableImmutableStorageScheme =
PTable.ImmutableStorageScheme.fromSerializedValue((byte)
proto.getNewTableImmutableStorageScheme());
+ maintainer.oldTableEncodingScheme =
PTable.QualifierEncodingScheme.fromSerializedValue((byte)
proto.getOldTableEncodingScheme());
+ maintainer.oldTableImmutableStorageScheme =
PTable.ImmutableStorageScheme.fromSerializedValue((byte)
proto.getOldTableImmutableStorageScheme());
+
+ List<ServerCachingProtos.ColumnReference>
oldTableColRefsForCoveredColumnsList =
proto.getOldTableColRefForCoveredColumnsList();
+ List<ServerCachingProtos.ColumnReference>
newTableColRefsForCoveredColumnsList =
proto.getNewTableColRefForCoveredColumnsList();
+ maintainer.coveredColumnsMap =
Maps.newHashMapWithExpectedSize(oldTableColRefsForCoveredColumnsList.size());
+ boolean encodedColumnNames = maintainer.newTableEncodingScheme !=
NON_ENCODED_QUALIFIERS;
+ Iterator<ServerCachingProtos.ColumnReference> newTableColRefItr =
newTableColRefsForCoveredColumnsList.iterator();
+ for (ServerCachingProtos.ColumnReference colRefFromProto :
oldTableColRefsForCoveredColumnsList) {
+ ColumnReference oldTableColRef = new
ColumnReference(colRefFromProto.getFamily().toByteArray(),
colRefFromProto.getQualifier().toByteArray());
+ ColumnReference newTableColRef;
+ if (encodedColumnNames) {
+ ServerCachingProtos.ColumnReference fromProto =
newTableColRefItr.next();
+ newTableColRef = new
ColumnReference(fromProto.getFamily().toByteArray(),
fromProto.getQualifier().toByteArray());
+ } else {
+ byte[] cq = oldTableColRef.getQualifier();
+ byte[] cf = oldTableColRef.getFamily();
+ newTableColRef = new ColumnReference(cf, cq);
+ }
+ maintainer.coveredColumnsMap.put(oldTableColRef, newTableColRef);
+ }
+ maintainer.logicalNewTableName = proto.getLogicalNewTableName();
+ maintainer.initCachedState();
+ return maintainer;
+ }
+
+
+ public static List<IndexMaintainer> deserialize(byte[] buf) {
+ return deserialize(buf, 0, buf.length);
+ }
+
+ private static List<IndexMaintainer> deserialize(byte[] buf, int offset,
int length) {
+ List<IndexMaintainer> maintainers = Collections.emptyList();
+ if (length > 0) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(buf,
offset, length);
+ DataInput input = new DataInputStream(stream);
+ try {
+ int size = WritableUtils.readVInt(input);
+ boolean isDataTableSalted = size < 0;
+ size = Math.abs(size);
+ RowKeySchema rowKeySchema = new RowKeySchema();
+ rowKeySchema.readFields(input);
+ maintainers = Lists.newArrayListWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ int protoSize = WritableUtils.readVInt(input);
+ byte[] b = new byte[protoSize];
+ input.readFully(b);
+ ServerCachingProtos.TransformMaintainer proto =
ServerCachingProtos.TransformMaintainer.parseFrom(b);
+ maintainers.add(TransformMaintainer.fromProto(proto,
rowKeySchema, isDataTableSalted));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e); // Impossible
+ }
+ }
+ return maintainers;
+ }
+
+ // Return new table's name
+ public byte[] getIndexTableName() {
+ return newTableName;
+ }
+
+ public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable
rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, long ts) {
Review comment:
nit: Javadoc would be really useful here. (Have to read the method to
figure out that we're building a new table key from the old table key, rather
than vice versa.)
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
##########
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexDBWritable;
+import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
+import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
+import static org.apache.phoenix.util.QueryUtil.getConnection;
+
+public class TransformTool extends Configured implements Tool {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TransformTool.class);
+
+ public static enum MR_COUNTER_METRICS {
+ TRANSFORM_FAILED,
+ TRANSFORM_SUCCEED
+ }
+
+ private static final Option OUTPUT_PATH_OPTION = new Option("op",
"output-path", true,
+ "Output path where the files are written");
+ private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema",
true,
+ "Phoenix schema name (optional)");
+ private static final Option DATA_TABLE_OPTION = new Option("dt",
"data-table", true,
+ "Data table name (mandatory)");
+ private static final Option INDEX_TABLE_OPTION = new Option("it",
"index-table", true,
+ "Index table name(not required in case of partial rebuilding)");
+
+ private static final Option PARTIAL_TRANSFORM_OPTION = new Option("pt",
"partial-transform", false,
+ "To transform a data table from a start timestamp");
+
+ private static final Option ABORT_TRANSFORM_OPTION = new Option("abort",
"abort", false,
+ "Aborts the ongoing transform");
+
+ private static final Option PAUSE_TRANSFORM_OPTION = new Option("pause",
"pause", false,
+ "Pauses the ongoing transform. If the ongoing transform fails, it
will not be retried");
+
+ private static final Option RESUME_TRANSFORM_OPTION = new Option("resume",
"resume", false,
+ "Resumes the ongoing transform");
+
+ private static final Option JOB_PRIORITY_OPTION = new Option("p",
"job-priority", true,
+ "Define job priority from 0(highest) to 4. Default is 2(normal)");
+
+ private static final int DEFAULT_AUTOSPLIT_NUM_REGIONS = 20;
+
+ private static final Option AUTO_SPLIT_OPTION =
+ new Option("spa", "autosplit", true,
+ "Automatically split the new table if the # of data table
regions is greater than N. "
+ + "Takes an optional argument specifying N,
otherwise defaults to " + DEFAULT_AUTOSPLIT_NUM_REGIONS
+ );
+
+ private static final Option RUN_FOREGROUND_OPTION =
+ new Option(
+ "runfg",
+ "run-foreground",
+ false,
+ "If specified, runs transform in Foreground. Default -
Runs the transform in background.");
+
+ private static final Option TENANT_ID_OPTION = new Option("tenant",
"tenant-id", true,
+ "If specified, uses Tenant connection for tenant index transform
(optional)");
+
+ private static final Option HELP_OPTION = new Option("h", "help", false,
"Help");
+ private static final Option START_TIME_OPTION = new Option("st",
"start-time",
+ true, "Start time for transform");
+
+ private static final Option END_TIME_OPTION = new Option("et", "end-time",
+ true, "End time for transform");
+
+ public static final String TRANSFORM_JOB_NAME_TEMPLATE =
"PHOENIX_TRANS_%s.%s";
+
+ public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial
transform accepts "
+ + "non-zero ts set in the past as start-time(st) option and that
ts must be present in SYSTEM.TRANSFORM table";
+
+ public static final String TRANSFORM_NOT_APPLICABLE = "Transform is not
applicable for local indexes or views or transactional tables";
+
+ public static final String PARTIAL_TRANSFORM_NOT_COMPATIBLE = "Can't
abort/pause/resume/split during partial transform";
+
+ private Configuration configuration;
+ private Connection connection;
+ private String tenantId;
+ private String dataTable;
+ private String logicalParentName;
+ private String basePath;
+ // logicalTableName is index table and logicalParentName is the data table
if this is an index transform
+ // If this is a data table transform, logicalParentName is null and
logicalTableName is dataTable
+ private String logicalTableName;
+ private String schemaName;
+ private String indexTable;
+ private String qDataTable; //normalized with schema
+ private PTable pIndexTable = null;
+ private PTable pDataTable;
+ private PTable pOldTable;
+ private PTable pNewTable;
+
+ private String oldTableWithSchema;
+ private String newTableWithSchema;
+ private JobPriority jobPriority;
+ private String jobName;
+ private boolean isForeground;
+ private Long startTime, endTime, lastTransformTime;
+ private boolean isPartialTransform;
+ private Job job;
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public Long getEndTime() { return endTime; }
+
+ public CommandLine parseOptions(String[] args) {
+ final Options options = getOptions();
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parser.parse(options, args);
+ } catch (ParseException e) {
+ printHelpAndExit("Error parsing command line options: " +
e.getMessage(),
+ options);
+ }
+
+ if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+ printHelpAndExit(options, 0);
+ }
+
+ this.jobPriority = getJobPriority(cmdLine);
+
+ boolean dataTableProvided =
(cmdLine.hasOption(DATA_TABLE_OPTION.getOpt()));
+ if (!dataTableProvided) {
+ throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + "
is a mandatory parameter");
+ }
+
+ return cmdLine;
+ }
+
+ private Options getOptions() {
+ final Options options = new Options();
+ options.addOption(OUTPUT_PATH_OPTION);
+ options.addOption(SCHEMA_NAME_OPTION);
+ options.addOption(DATA_TABLE_OPTION);
+ options.addOption(INDEX_TABLE_OPTION);
+ options.addOption(TENANT_ID_OPTION);
+ options.addOption(HELP_OPTION);
+ options.addOption(JOB_PRIORITY_OPTION);
+ options.addOption(RUN_FOREGROUND_OPTION);
+ options.addOption(PARTIAL_TRANSFORM_OPTION);
+ options.addOption(START_TIME_OPTION);
+ options.addOption(END_TIME_OPTION);
+ options.addOption(AUTO_SPLIT_OPTION);
+ options.addOption(ABORT_TRANSFORM_OPTION);
+ options.addOption(PAUSE_TRANSFORM_OPTION);
+ options.addOption(RESUME_TRANSFORM_OPTION);
+ START_TIME_OPTION.setOptionalArg(true);
+ END_TIME_OPTION.setOptionalArg(true);
+ return options;
+ }
+
+ private void printHelpAndExit(String errorMessage, Options options) {
+ System.err.println(errorMessage);
+ LOGGER.error(errorMessage);
+ printHelpAndExit(options, 1);
+ }
+
+ private void printHelpAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("help", options);
+ System.exit(exitCode);
+ }
+
+ public CommandLine parseArgs(String[] args) throws Exception {
+ CommandLine cmdLine;
+ try {
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ throw e;
+ }
+
+ if (getConf() == null) {
+ setConf(HBaseConfiguration.create());
+ }
+
+ return cmdLine;
+ }
+
+ @VisibleForTesting
+ public int populateTransformToolAttributesAndValidate(CommandLine cmdLine)
throws Exception {
+ boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
+ boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+ basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+ isPartialTransform =
cmdLine.hasOption(PARTIAL_TRANSFORM_OPTION.getOpt());
+ if (useStartTime) {
+ startTime = new
Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+ }
+
+ if (useEndTime) {
+ endTime = new
Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
+ }
+
+ if (isTimeRangeSet(startTime, endTime)) {
+ validateTimeRange(startTime, endTime);
+ }
+
+ if (isPartialTransform &&
+ (cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))) {
+ throw new
IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
+ }
+ if (isPartialTransform &&
+ (cmdLine.hasOption(ABORT_TRANSFORM_OPTION.getOpt()) ||
cmdLine.hasOption(PAUSE_TRANSFORM_OPTION.getOpt())
+ ||
cmdLine.hasOption(RESUME_TRANSFORM_OPTION.getOpt()))) {
+ throw new
IllegalArgumentException(PARTIAL_TRANSFORM_NOT_COMPATIBLE);
+ }
+
+ if (isPartialTransform) {
+ if (!cmdLine.hasOption(START_TIME_OPTION.getOpt())) {
+ throw new
IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+ }
+ lastTransformTime = new
Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+ SystemTransformRecord transformRecord = getTransformRecord(null);
+ if (transformRecord == null) {
+ throw new
IllegalArgumentException(PARTIAL_TRANSFORM_NOT_APPLICABLE);
+ }
+ if (lastTransformTime == null) {
+ lastTransformTime =
transformRecord.getTransformEndTs().getTime();
+ } else {
+ validateLastTransformTime();
+ }
+ }
+
+ schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+ dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+ indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+ qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+ isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ logicalTableName = dataTable;
+ logicalParentName = null;
+ if (!Strings.isNullOrEmpty(indexTable)) {
+ logicalTableName = indexTable;
+ logicalParentName = SchemaUtil.getTableName(schemaName, dataTable);
+ }
+
+ pDataTable = PhoenixRuntime.getTable(
+ connection, SchemaUtil.getQualifiedTableName(schemaName,
dataTable));
+ if (indexTable != null) {
+ pIndexTable = PhoenixRuntime.getTable(
+ connection, SchemaUtil.getQualifiedTableName(schemaName,
indexTable));
+ pOldTable = pIndexTable;
+ } else {
+ pOldTable = pDataTable;
+ }
+
+ SystemTransformRecord transformRecord =
getTransformRecord(connection.unwrap(PhoenixConnection.class));
+
+ validateTransform(pDataTable, pIndexTable, transformRecord);
+ String newTableName =
SchemaUtil.getTableNameFromFullName(transformRecord.getNewPhysicalTableName());
+ pNewTable = PhoenixRuntime.getTable(
Review comment:
should this be getTableNoCache? Does it matter if our copy of PTable is
slightly stale?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]