PHOENIX-4619 Process transactional updates to local index on server-side
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c1827f24 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c1827f24 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c1827f24 Branch: refs/heads/4.x-cdh5.13 Commit: c1827f2408fa118be269efc79eaadacd6de48bc9 Parents: 03fedf6 Author: James Taylor <jtay...@salesforce.com> Authored: Sat Mar 17 19:52:38 2018 +0000 Committer: Pedro Boado <pbo...@apache.org> Committed: Fri Mar 23 21:31:42 2018 +0000 ---------------------------------------------------------------------- .../phoenix/end2end/index/BaseIndexIT.java | 16 +- .../phoenix/end2end/index/ImmutableIndexIT.java | 3 +- .../end2end/index/MutableIndexFailureIT.java | 8 +- .../apache/phoenix/compile/DeleteCompiler.java | 36 +- .../PhoenixTransactionalProcessor.java | 2 +- .../apache/phoenix/execute/MutationState.java | 103 +++- .../PhoenixTxIndexMutationGenerator.java | 449 ++++++++++++++++ .../PhoenixTxnIndexMutationGenerator.java | 519 ------------------- .../org/apache/phoenix/hbase/index/Indexer.java | 1 - .../hbase/index/builder/BaseIndexBuilder.java | 4 +- .../hbase/index/builder/BaseIndexCodec.java | 7 - .../phoenix/hbase/index/covered/IndexCodec.java | 14 +- .../hbase/index/covered/LocalTableState.java | 10 +- .../hbase/index/covered/NonTxIndexBuilder.java | 2 +- .../phoenix/hbase/index/covered/TableState.java | 8 - .../apache/phoenix/index/IndexMaintainer.java | 23 +- .../phoenix/index/PhoenixIndexBuilder.java | 21 +- .../apache/phoenix/index/PhoenixIndexCodec.java | 34 +- .../phoenix/index/PhoenixIndexMetaData.java | 78 +-- .../index/PhoenixIndexMetaDataBuilder.java | 106 ++++ .../index/PhoenixTransactionalIndexer.java | 442 +--------------- .../query/ConnectionQueryServicesImpl.java | 8 + .../transaction/OmidTransactionContext.java | 2 +- .../transaction/PhoenixTransactionContext.java | 2 +- .../transaction/TephraTransactionContext.java | 2 +- .../index/covered/CoveredColumnIndexCodec.java | 6 +- .../covered/CoveredIndexCodecForTesting.java | 5 +- .../index/covered/LocalTableStateTest.java | 10 +- .../index/covered/NonTxIndexBuilderTest.java | 3 + .../covered/TestCoveredColumnIndexCodec.java | 6 +- 30 files changed, 785 insertions(+), 1145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java index 1483c58..f914256 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java @@ -239,15 +239,17 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { } private void assertNoClientSideIndexMutations(Connection conn) throws SQLException { - if (mutable) { - Iterator<Pair<byte[],List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn); - if (iterator.hasNext()) { - byte[] tableName = iterator.next().getFirst(); // skip data table mutations - PTable table = PhoenixRuntime.getTable(conn, Bytes.toString(tableName)); + Iterator<Pair<byte[],List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn); + if (iterator.hasNext()) { + byte[] tableName = iterator.next().getFirst(); // skip data table mutations + PTable table = PhoenixRuntime.getTable(conn, Bytes.toString(tableName)); + boolean clientSideUpdate = !localIndex && (!mutable || transactional); + if (!clientSideUpdate) { assertTrue(table.getType() == PTableType.TABLE); // should be data table - boolean hasIndexData = iterator.hasNext(); - assertFalse(hasIndexData && !transactional); // should have no index data } + boolean hasIndexData = iterator.hasNext(); + // global immutable and global transactional tables are processed client side + assertEquals(clientSideUpdate, hasIndexData); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index d520824..1db9787 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -252,8 +252,9 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { Iterator<Pair<byte[], List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn); assertTrue(iterator.hasNext()); iterator.next(); - assertEquals((!localIndex || transactional), iterator.hasNext()); + assertEquals(!localIndex, iterator.hasNext()); } + // This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back. @Ignore http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index c2e0cb6..715e37f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -112,7 +112,7 @@ public class MutableIndexFailureIT extends BaseTest { public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) { this.transactional = transactional; this.localIndex = localIndex; - this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "") + this.tableDDLOptions = " SALT_BUCKETS=2, COLUMN_ENCODED_BYTES=NONE" + (transactional ? ", TRANSACTIONAL=true " : "") + (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure)) + (throwIndexWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.THROW_INDEX_WRITE_FAILURE + "=" + throwIndexWriteFailure)); this.tableName = FailingRegionObserver.FAIL_TABLE_NAME; @@ -289,7 +289,6 @@ public class MutableIndexFailureIT extends BaseTest { assertEquals("z", rs.getString(2)); assertFalse(rs.next()); - FailingRegionObserver.FAIL_WRITE = true; updateTable(conn, true); // Verify the metadata for index is correct. rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName), @@ -473,9 +472,12 @@ public class MutableIndexFailureIT extends BaseTest { stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?"); stmt.setString(1, "b"); stmt.execute(); + // Set to fail after the DELETE, since transactional tables will write + // uncommitted data when the DELETE is executed. + FailingRegionObserver.FAIL_WRITE = true; try { conn.commit(); - if (commitShouldFail && !localIndex && this.throwIndexWriteFailure) { + if (commitShouldFail && (!localIndex || transactional) && this.throwIndexWriteFailure) { fail(); } } catch (CommitException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 70043bb..7985314 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -226,7 +226,7 @@ public class DeleteCompiler { // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the // row key will already have its value. // Check for otherTableRefs being empty required when deleting directly from the index - if (otherTableRefs.isEmpty() || (table.getIndexType() != IndexType.LOCAL && table.isImmutableRows())) { + if (otherTableRefs.isEmpty() || isMaintainedOnClient(table)) { mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } for (int i = 0; i < otherTableRefs.size(); i++) { @@ -311,12 +311,12 @@ public class DeleteCompiler { } } - private List<PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) { + private List<PTable> getClientSideMaintainedIndexes(TableRef tableRef) { PTable table = tableRef.getTable(); - if (table.isImmutableRows() && !table.getIndexes().isEmpty()) { + if (!table.getIndexes().isEmpty()) { List<PTable> nonDisabledIndexes = Lists.newArrayListWithExpectedSize(table.getIndexes().size()); for (PTable index : table.getIndexes()) { - if (index.getIndexState() != PIndexState.DISABLE && index.getIndexType() == IndexType.GLOBAL) { + if (index.getIndexState() != PIndexState.DISABLE && isMaintainedOnClient(index)) { nonDisabledIndexes.add(index); } } @@ -459,8 +459,8 @@ public class DeleteCompiler { .setTableName(tableName).build().buildException(); } - List<PTable> immutableIndexes = getNonDisabledGlobalImmutableIndexes(targetTableRef); - final boolean hasImmutableIndexes = !immutableIndexes.isEmpty(); + List<PTable> clientSideIndexes = getClientSideMaintainedIndexes(targetTableRef); + final boolean hasClientSideIndexes = !clientSideIndexes.isEmpty(); boolean isSalted = table.getBucketNum() != null; boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant(); @@ -468,7 +468,7 @@ public class DeleteCompiler { int pkColumnOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); final int pkColumnCount = table.getPKColumns().size() - pkColumnOffset; int selectColumnCount = pkColumnCount; - for (PTable index : immutableIndexes) { + for (PTable index : clientSideIndexes) { selectColumnCount += index.getPKColumns().size() - pkColumnCount; } Set<PColumn> projectedColumns = new LinkedHashSet<PColumn>(selectColumnCount + pkColumnOffset); @@ -518,7 +518,7 @@ public class DeleteCompiler { // that is being upserted for conflict detection purposes. // If we have immutable indexes, we'd increase the number of bytes scanned by executing // separate queries against each index, so better to drive from a single table in that case. - boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasImmutableIndexes; + boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasClientSideIndexes; HintNode hint = delete.getHint(); if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) { select = SelectStatement.create(select, HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE)); @@ -529,7 +529,7 @@ public class DeleteCompiler { QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe, new SequenceManager(statement)); final QueryPlan dataPlan = compiler.compile(); // TODO: the select clause should know that there's a sub query, but doesn't seem to currently - queryPlans = Lists.newArrayList(!immutableIndexes.isEmpty() + queryPlans = Lists.newArrayList(!clientSideIndexes.isEmpty() ? optimizer.getApplicablePlans(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe) : optimizer.getBestPlan(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe)); // Filter out any local indexes that don't contain all indexed columns. @@ -559,7 +559,7 @@ public class DeleteCompiler { // may have been optimized out. Instead, we check that there's a single SkipScanFilter // If we can generate a plan for every index, that means all the required columns are available in every index, // hence we can drive the delete from any of the plans. - noQueryReqd &= queryPlans.size() == 1 + immutableIndexes.size(); + noQueryReqd &= queryPlans.size() == 1 + clientSideIndexes.size(); int queryPlanIndex = 0; while (noQueryReqd && queryPlanIndex < queryPlans.size()) { QueryPlan plan = queryPlans.get(queryPlanIndex++); @@ -578,7 +578,6 @@ public class DeleteCompiler { // from the data table, while the others will be for deleting rows from immutable indexes. List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size()); for (final QueryPlan plan : queryPlans) { - final StatementContext context = plan.getContext(); mutationPlans.add(new SingleRowDeleteMutationPlan(plan, connection, maxSize, maxSizeBytes)); } return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans); @@ -628,8 +627,8 @@ public class DeleteCompiler { } } final QueryPlan bestPlan = bestPlanToBe; - final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(immutableIndexes.size()); - for (PTable index : immutableIndexes) { + final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(clientSideIndexes.size()); + for (PTable index : clientSideIndexes) { if (!bestPlan.getTableRef().getTable().equals(index)) { otherTableRefs.add(new TableRef(index, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp())); } @@ -917,13 +916,13 @@ public class DeleteCompiler { int totalTablesUpdateClientSide = 1; // data table is always updated PTable bestTable = bestPlan.getTableRef().getTable(); // global immutable tables are also updated client side (but don't double count the data table) - if (bestPlan != dataPlan && bestTable.getIndexType() == IndexType.GLOBAL && bestTable.isImmutableRows()) { + if (bestPlan != dataPlan && isMaintainedOnClient(bestTable)) { totalTablesUpdateClientSide++; } for (TableRef otherTableRef : otherTableRefs) { PTable otherTable = otherTableRef.getTable(); // Don't double count the data table here (which morphs when it becomes a projected table, hence this check) - if (projectedTableRef != otherTableRef && otherTable.getIndexType() == IndexType.GLOBAL && otherTable.isImmutableRows()) { + if (projectedTableRef != otherTableRef && isMaintainedOnClient(otherTable)) { totalTablesUpdateClientSide++; } } @@ -972,4 +971,11 @@ public class DeleteCompiler { return bestPlan; } } + + private static boolean isMaintainedOnClient(PTable table) { + // Test for not being local (rather than being GLOBAL) so that this doesn't fail + // when tested with our projected table. + return table.getIndexType() != IndexType.LOCAL && (table.isImmutableRows() || table.isTransactional()); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java index 37fa2ab..ca0c997 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java @@ -22,7 +22,7 @@ import org.apache.phoenix.transaction.TransactionFactory; public class PhoenixTransactionalProcessor extends DelegateRegionObserver { public PhoenixTransactionalProcessor() { - super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoProcessor()); + super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoprocessor()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 34fcc2d..39cb7a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.htrace.Span; import org.apache.htrace.TraceScope; +import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.MutationPlan; @@ -62,6 +63,7 @@ import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; +import org.apache.phoenix.index.PhoenixIndexMetaData; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.monitoring.GlobalClientMetrics; @@ -138,7 +140,6 @@ public class MutationState implements SQLCloseable { private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap(); final PhoenixTransactionContext phoenixTransactionContext; - final PhoenixTxnIndexMutationGenerator phoenixTxnIndexMutationGenerator; private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; @@ -180,7 +181,7 @@ public class MutationState implements SQLCloseable { boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; - if (subTask == false) { + if (!subTask) { if (txContext == null) { phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection); } else { @@ -192,8 +193,6 @@ public class MutationState implements SQLCloseable { // as it is not thread safe, so we use the tx member variable phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask); } - - phoenixTxnIndexMutationGenerator = new PhoenixTxnIndexMutationGenerator(connection, phoenixTransactionContext); } public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { @@ -496,17 +495,20 @@ public class MutationState implements SQLCloseable { private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); - final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism - (includeAllIndexes || table.isTransactional()) ? + final Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism + includeAllIndexes ? IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) : - (table.isImmutableRows()) ? + (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) : Collections.<PTable>emptyIterator(); + final List<PTable> indexList = Lists.newArrayList(indexIterator); + final Iterator<PTable> indexes = indexList.iterator(); final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList, mutationsPertainingToIndex); return new Iterator<Pair<PName,List<Mutation>>>() { boolean isFirst = true; + Map<byte[],List<Mutation>> indexMutationsMap = null; @Override public boolean hasNext() { @@ -519,15 +521,34 @@ public class MutationState implements SQLCloseable { isFirst = false; return new Pair<PName,List<Mutation>>(table.getPhysicalName(), mutationList); } + PTable index = indexes.next(); - List<Mutation> indexMutations; + + List<Mutation> indexMutations = null; try { - if ((table.isImmutableRows() && (index.getIndexType() != IndexType.LOCAL)) || !table.isTransactional()) { - indexMutations = - IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex, - connection.getKeyValueBuilder(), connection); - } else { - indexMutations = phoenixTxnIndexMutationGenerator.getIndexUpdates(table, index, mutationsPertainingToIndex); + if (!mutationsPertainingToIndex.isEmpty()) { + if (table.isTransactional()) { + if (indexMutationsMap == null) { + PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexList, mutationsPertainingToIndex.get(0).getAttributesMap()); + try (HTableInterface htable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes())) { + Collection<Pair<Mutation, byte[]>> allMutations = generator.getIndexUpdates(htable, mutationsPertainingToIndex.iterator()); + indexMutationsMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + for (Pair<Mutation, byte[]> mutation : allMutations) { + List<Mutation> mutations = indexMutationsMap.get(mutation.getSecond()); + if (mutations == null) { + mutations = Lists.newArrayList(); + indexMutationsMap.put(mutation.getSecond(), mutations); + } + mutations.add(mutation.getFirst()); + } + } + } + indexMutations = indexMutationsMap.get(index.getPhysicalName().getBytes()); + } else { + indexMutations = + IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex, + connection.getKeyValueBuilder(), connection); + } } // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map @@ -537,13 +558,17 @@ public class MutationState implements SQLCloseable { if (multiRowMutationState!=null) { final List<Mutation> deleteMutations = Lists.newArrayList(); generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null); - indexMutations.addAll(deleteMutations); + if (indexMutations == null) { + indexMutations = deleteMutations; + } else { + indexMutations.addAll(deleteMutations); + } } } } catch (SQLException | IOException e) { throw new IllegalDataException(e); } - return new Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations); + return new Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations == null ? Collections.<Mutation>emptyList() : indexMutations); } @Override @@ -554,6 +579,42 @@ public class MutationState implements SQLCloseable { }; } + private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List<PTable> indexes, Map<String,byte[]> attributes) { + final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size()); + for (PTable index : indexes) { + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + indexMaintainers.add(maintainer); + } + IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() { + + @Override + public void close() throws IOException { + } + + @Override + public List<IndexMaintainer> getIndexMaintainers() { + return indexMaintainers; + } + + @Override + public PhoenixTransactionContext getTransactionContext() { + return phoenixTransactionContext; + } + + @Override + public int getClientVersion() { + return MetaDataProtocol.PHOENIX_VERSION; + } + + }; + try { + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes); + return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, table.getPhysicalName().getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + } + private void generateMutations(final TableRef tableRef, final long mutationTimestamp, final long serverTimestamp, final MultiRowMutationState values, final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) { @@ -585,17 +646,13 @@ public class MutationState implements SQLCloseable { } } } - PRow row = - tableRef.getTable() - .newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey); + PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey); List<Mutation> rowMutations, rowMutationsPertainingToIndex; if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete row.delete(); rowMutations = row.toRowMutations(); - // Row deletes for index tables are processed by running a re-written query - // against the index table (as this allows for flexibility in being able to - // delete rows). - rowMutationsPertainingToIndex = rowMutations; + // The DeleteCompiler already generates the deletes for indexes, so no need to do it again + rowMutationsPertainingToIndex = Collections.emptyList(); } else { for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues() .entrySet()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java new file mode 100644 index 0000000..b5031af --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.hbase.index.MultiMutation; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.hbase.index.covered.IndexUpdate; +import org.apache.phoenix.hbase.index.covered.TableState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.index.PhoenixIndexMetaData; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; +import org.apache.phoenix.transaction.PhoenixTransactionalTable; +import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; + + +public class PhoenixTxIndexMutationGenerator { + + private static final Log LOG = LogFactory.getLog(PhoenixTxIndexMutationGenerator.class); + + private final PhoenixIndexCodec codec; + private final PhoenixIndexMetaData indexMetaData; + + public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) { + this.indexMetaData = indexMetaData; + this.codec = new PhoenixIndexCodec(conf, regionStartKey, regionEndKey, tableName); + } + + public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName) { + this(conf, indexMetaData, tableName, null, null); + } + + private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) { + MultiMutation stored = mutations.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutations.put(row, stored); + } + stored.addAll(m); + } + + public Collection<Pair<Mutation, byte[]>> getIndexUpdates(HTableInterface htable, Iterator<Mutation> mutationIterator) throws IOException, SQLException { + + if (!mutationIterator.hasNext()) { + return Collections.emptyList(); + } + + List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); + ResultScanner currentScanner = null; + // Collect up all mutations in batch + Map<ImmutableBytesPtr, MultiMutation> mutations = + new HashMap<ImmutableBytesPtr, MultiMutation>(); + // Collect the set of mutable ColumnReferences so that we can first + // run a scan to get the current state. We'll need this to delete + // the existing index rows. + int estimatedSize = indexMaintainers.size() * 10; + Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize); + for (IndexMaintainer indexMaintainer : indexMaintainers) { + // For transactional tables, we use an index maintainer + // to aid in rollback if there's a KeyValue column in the index. The alternative would be + // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the + // client side. + Set<ColumnReference> allColumns = indexMaintainer.getAllColumns(); + mutableColumns.addAll(allColumns); + } + + Mutation m = mutationIterator.next(); + Map<String,byte[]> updateAttributes = m.getAttributesMap(); + byte[] txRollbackAttribute = updateAttributes.get(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY); + boolean isRollback = txRollbackAttribute!=null; + + boolean isImmutable = indexMetaData.isImmutableRows(); + Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations; + if (isImmutable && !isRollback) { + findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); + } else { + findPriorValueMutations = mutations; + } + + while (true) { + // add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + // if we have no non PK columns, no need to find the prior values + if ( mutations != findPriorValueMutations && indexMetaData.requiresPriorRowState(m) ) { + addMutation(findPriorValueMutations, row, m); + } + addMutation(mutations, row, m); + + if (!mutationIterator.hasNext()) { + break; + } + m = mutationIterator.next(); + } + + Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size()); + // Track if we have row keys with Delete mutations (or Puts that are + // Tephra's Delete marker). If there are none, we don't need to do the scan for + // prior versions, if there are, we do. Since rollbacks always have delete mutations, + // this logic will work there too. + if (!findPriorValueMutations.isEmpty()) { + List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size()); + for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) { + keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); + } + Scan scan = new Scan(); + // Project all mutable columns + for (ColumnReference ref : mutableColumns) { + scan.addColumn(ref.getFamily(), ref.getQualifier()); + } + /* + * Indexes inherit the storage scheme of the data table which means all the indexes have the same + * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start + * supporting new indexes over existing data tables to have a different storage scheme than the data + * table. + */ + byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier(); + + // Project empty key value column + scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier); + ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); + scanRanges.initializeScan(scan); + PhoenixTransactionalTable txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(indexMetaData.getTransactionContext(), htable); + // For rollback, we need to see all versions, including + // the last committed version as there may be multiple + // checkpointed versions. + SkipScanFilter filter = scanRanges.getSkipScanFilter(); + if (isRollback) { + filter = new SkipScanFilter(filter,true); + indexMetaData.getTransactionContext().setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL); + } + scan.setFilter(filter); + currentScanner = txTable.getScanner(scan); + } + if (isRollback) { + processRollback(indexMetaData, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations); + } else { + processMutation(indexMetaData, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, findPriorValueMutations); + } + + return indexUpdates; + } + + private void processMutation(PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, + ResultScanner scanner, + Set<ColumnReference> upsertColumns, + Collection<Pair<Mutation, byte[]>> indexUpdates, + Map<ImmutableBytesPtr, MultiMutation> mutations, + Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException { + List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); + if (scanner != null) { + Result result; + ColumnReference emptyColRef = new ColumnReference(indexMaintainers.get(0) + .getDataEmptyKeyValueCF(), indexMaintainers.get(0).getEmptyKeyValueQualifier()); + // Process existing data table rows by removing the old index row and adding the new index row + while ((result = scanner.next()) != null) { + Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); + TxTableState state = new TxTableState(upsertColumns, indexMetaData.getTransactionContext().getWritePointer(), m, emptyColRef, result); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + generatePuts(indexMetaData, indexUpdates, state); + } + } + // Process new data table by adding new index rows + for (Mutation m : mutations.values()) { + TxTableState state = new TxTableState(upsertColumns, indexMetaData.getTransactionContext().getWritePointer(), m); + generatePuts(indexMetaData, indexUpdates, state); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + } + } + + private void processRollback(PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, + ResultScanner scanner, + Set<ColumnReference> mutableColumns, + Collection<Pair<Mutation, byte[]>> indexUpdates, + Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException { + if (scanner != null) { + long readPtr = indexMetaData.getTransactionContext().getReadPointer(); + Result result; + // Loop through last committed row state plus all new rows associated with current transaction + // to generate point delete markers for all index rows that were added. We don't have Tephra + // manage index rows in change sets because we don't want to be hit with the additional + // memory hit and do not need to do conflict detection on index rows. + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); + while ((result = scanner.next()) != null) { + Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); + // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest + // (as if we're "replaying" them in time order). + List<Cell> cells = result.listCells(); + Collections.sort(cells, new Comparator<Cell>() { + + @Override + public int compare(Cell o1, Cell o2) { + int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp()); + if (c != 0) return c; + c = o1.getTypeByte() - o2.getTypeByte(); + if (c != 0) return c; + c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength()); + if (c != 0) return c; + return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength()); + } + + }); + int i = 0; + int nCells = cells.size(); + Result oldResult = null, newResult; + do { + boolean hasPuts = false; + LinkedList<Cell> singleTimeCells = Lists.newLinkedList(); + long writePtr; + Cell cell = cells.get(i); + do { + hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode(); + writePtr = cell.getTimestamp(); + ListIterator<Cell> it = singleTimeCells.listIterator(); + do { + // Add at the beginning of the list to match the expected HBase + // newest to oldest sort order (which TxTableState relies on + // with the Result.getLatestColumnValue() calls). However, we + // still want to add Cells in the expected order for each time + // bound as otherwise we won't find it in our old state. + it.add(cell); + } while (++i < nCells && (cell=cells.get(i)).getTimestamp() == writePtr); + } while (i < nCells && cell.getTimestamp() <= readPtr); + + // Generate point delete markers for the prior row deletion of the old index value. + // The write timestamp is the next timestamp, not the current timestamp, + // as the earliest cells are the current values for the row (and we don't + // want to delete the current row). + if (oldResult != null) { + TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, oldResult); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + } + // Generate point delete markers for the new index value. + // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not + // generate deletes. We would have generated the delete above based on the state + // of the previous row. The delete markers do not give us the state we need to + // delete. + if (hasPuts) { + newResult = Result.create(singleTimeCells); + // First row may represent the current state which we don't want to delete + if (writePtr > readPtr) { + TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, newResult); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + } + oldResult = newResult; + } else { + oldResult = null; + } + } while (i < nCells); + } + } + } + + private void generateDeletes(PhoenixIndexMetaData indexMetaData, + Collection<Pair<Mutation, byte[]>> indexUpdates, + byte[] attribValue, TxTableState state) throws IOException { + Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); + for (IndexUpdate delete : deletes) { + if (delete.isValid()) { + delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); + indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); + } + } + } + + private boolean generatePuts( + PhoenixIndexMetaData indexMetaData, + Collection<Pair<Mutation, byte[]>> indexUpdates, + TxTableState state) + throws IOException { + state.applyMutation(); + Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); + boolean validPut = false; + for (IndexUpdate put : puts) { + if (put.isValid()) { + indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName())); + validPut = true; + } + } + return validPut; + } + + + private static class TxTableState implements TableState { + private final Mutation mutation; + private final long currentTimestamp; + private final List<KeyValue> pendingUpdates; + private final Set<ColumnReference> indexedColumns; + private final Map<ColumnReference, ImmutableBytesWritable> valueMap; + + private TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation mutation) { + this.currentTimestamp = currentTimestamp; + this.indexedColumns = indexedColumns; + this.mutation = mutation; + int estimatedSize = indexedColumns.size(); + this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize); + this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize); + try { + CellScanner scanner = mutation.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell)); + } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + } + + public TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) { + this(indexedColumns, currentTimestamp, m); + + for (ColumnReference ref : indexedColumns) { + Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); + if (cell != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } + } + + @Override + public long getCurrentTimestamp() { + return currentTimestamp; + } + + @Override + public byte[] getCurrentRowKey() { + return mutation.getRow(); + } + + @Override + public List<? extends IndexedColumnGroup> getIndexColumnHints() { + return Collections.emptyList(); + } + + private void applyMutation() { + for (Cell cell : pendingUpdates) { + if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + valueMap.remove(ref); + } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { + for (ColumnReference ref : indexedColumns) { + if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) { + valueMap.remove(ref); + } + } + } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){ + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + if (indexedColumns.contains(ref)) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } else { + throw new IllegalStateException("Unexpected mutation type for " + cell); + } + } + } + + @Override + public Collection<KeyValue> getPendingUpdate() { + return pendingUpdates; + } + + @Override + public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData) + throws IOException { + // TODO: creating these objects over and over again is wasteful + ColumnTracker tracker = new ColumnTracker(indexedColumns); + ValueGetter getter = new ValueGetter() { + + @Override + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { + return valueMap.get(ref); + } + + @Override + public byte[] getRowKey() { + return mutation.getRow(); + } + + }; + Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker)); + return pair; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java deleted file mode 100644 index b596b75..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.execute; - -import java.io.IOException; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.filter.SkipScanFilter; -import org.apache.phoenix.hbase.index.MultiMutation; -import org.apache.phoenix.hbase.index.ValueGetter; -import org.apache.phoenix.hbase.index.covered.IndexMetaData; -import org.apache.phoenix.hbase.index.covered.IndexUpdate; -import org.apache.phoenix.hbase.index.covered.TableState; -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; -import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.index.IndexMaintainer; -import org.apache.phoenix.index.PhoenixIndexCodec; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.types.PVarbinary; -import org.apache.phoenix.transaction.PhoenixTransactionContext; -import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; -import org.apache.phoenix.util.ScanUtil; -import org.apache.phoenix.util.SchemaUtil; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.primitives.Longs; - - -public class PhoenixTxnIndexMutationGenerator { - - private static final Log LOG = LogFactory.getLog(PhoenixTxnIndexMutationGenerator.class); - - private final PhoenixConnection connection; - private final PhoenixTransactionContext phoenixTransactionContext; - - PhoenixTxnIndexMutationGenerator(PhoenixConnection connection, PhoenixTransactionContext phoenixTransactionContext) { - this.phoenixTransactionContext = phoenixTransactionContext; - this.connection = connection; - } - - private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) { - MultiMutation stored = mutations.get(row); - // we haven't seen this row before, so add it - if (stored == null) { - stored = new MultiMutation(row); - mutations.put(row, stored); - } - stored.addAll(m); - } - - public List<Mutation> getIndexUpdates(final PTable table, PTable index, List<Mutation> dataMutations) throws IOException, SQLException { - - if (dataMutations.isEmpty()) { - return new ArrayList<Mutation>(); - } - - Map<String,byte[]> updateAttributes = dataMutations.get(0).getAttributesMap(); - boolean replyWrite = (BaseScannerRegionObserver.ReplayWrite.fromBytes(updateAttributes.get(BaseScannerRegionObserver.REPLAY_WRITES)) != null); - byte[] txRollbackAttribute = updateAttributes.get(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY); - - IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); - - boolean isRollback = txRollbackAttribute!=null; - boolean isImmutable = index.isImmutableRows(); - ResultScanner currentScanner = null; - HTableInterface txTable = null; - // Collect up all mutations in batch - Map<ImmutableBytesPtr, MultiMutation> mutations = - new HashMap<ImmutableBytesPtr, MultiMutation>(); - Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations; - if (isImmutable && !isRollback) { - findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); - } else { - findPriorValueMutations = mutations; - } - // Collect the set of mutable ColumnReferences so that we can first - // run a scan to get the current state. We'll need this to delete - // the existing index rows. - int estimatedSize = 10; - Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize); - // For transactional tables, we use an index maintainer - // to aid in rollback if there's a KeyValue column in the index. The alternative would be - // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the - // client side. - Set<ColumnReference> allColumns = maintainer.getAllColumns(); - mutableColumns.addAll(allColumns); - - for(final Mutation m : dataMutations) { - // add the mutation to the batch set - ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); - // if we have no non PK columns, no need to find the prior values - - boolean requiresPriorRowState = !isImmutable || (maintainer.isRowDeleted(m) && !maintainer.getIndexedColumns().isEmpty()); - if (mutations != findPriorValueMutations && requiresPriorRowState) { - addMutation(findPriorValueMutations, row, m); - } - addMutation(mutations, row, m); - } - - List<Mutation> indexUpdates = new ArrayList<Mutation>(mutations.size() * 2); - try { - // Track if we have row keys with Delete mutations (or Puts that are - // Tephra's Delete marker). If there are none, we don't need to do the scan for - // prior versions, if there are, we do. Since rollbacks always have delete mutations, - // this logic will work there too. - if (!findPriorValueMutations.isEmpty()) { - List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size()); - for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) { - keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); - } - Scan scan = new Scan(); - // Project all mutable columns - for (ColumnReference ref : mutableColumns) { - scan.addColumn(ref.getFamily(), ref.getQualifier()); - } - /* - * Indexes inherit the storage scheme of the data table which means all the indexes have the same - * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start - * supporting new indexes over existing data tables to have a different storage scheme than the data - * table. - */ - byte[] emptyKeyValueQualifier = maintainer.getEmptyKeyValueQualifier(); - - // Project empty key value column - scan.addColumn(maintainer.getDataEmptyKeyValueCF(), emptyKeyValueQualifier); - ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); - scanRanges.initializeScan(scan); - txTable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes()); - // For rollback, we need to see all versions, including - // the last committed version as there may be multiple - // checkpointed versions. - SkipScanFilter filter = scanRanges.getSkipScanFilter(); - if (isRollback) { - filter = new SkipScanFilter(filter,true); - phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL); - } - scan.setFilter(filter); - currentScanner = txTable.getScanner(scan); - } - if (isRollback) { - processRollback(maintainer, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, replyWrite, table); - } else { - processMutation(maintainer, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, findPriorValueMutations, replyWrite, table); - } - } finally { - if (txTable != null) txTable.close(); - } - - return indexUpdates; - } - - private void processMutation(IndexMaintainer maintainer, - byte[] txRollbackAttribute, - ResultScanner scanner, - Set<ColumnReference> upsertColumns, - Collection<Mutation> indexUpdates, - Map<ImmutableBytesPtr, MultiMutation> mutations, - Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue, - boolean replyWrite, - final PTable table) throws IOException, SQLException { - if (scanner != null) { - Result result; - ColumnReference emptyColRef = new ColumnReference(maintainer - .getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier()); - // Process existing data table rows by removing the old index row and adding the new index row - while ((result = scanner.next()) != null) { - Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); - TxTableState state = new TxTableState(upsertColumns, phoenixTransactionContext.getWritePointer(), m, emptyColRef, result); - generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table); - generatePuts(indexUpdates, state, maintainer, replyWrite, table); - } - } - // Process new data table by adding new index rows - for (Mutation m : mutations.values()) { - TxTableState state = new TxTableState(upsertColumns, phoenixTransactionContext.getWritePointer(), m); - generatePuts(indexUpdates, state, maintainer, replyWrite, table); - generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table); - } - } - - private void processRollback(IndexMaintainer maintainer, - byte[] txRollbackAttribute, - ResultScanner scanner, - Set<ColumnReference> mutableColumns, - Collection<Mutation> indexUpdates, - Map<ImmutableBytesPtr, MultiMutation> mutations, - boolean replyWrite, - final PTable table) throws IOException, SQLException { - if (scanner != null) { - Result result; - // Loop through last committed row state plus all new rows associated with current transaction - // to generate point delete markers for all index rows that were added. We don't have Tephra - // manage index rows in change sets because we don't want to be hit with the additional - // memory hit and do not need to do conflict detection on index rows. - ColumnReference emptyColRef = new ColumnReference(maintainer.getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier()); - while ((result = scanner.next()) != null) { - Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); - // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest - // (as if we're "replaying" them in time order). - List<Cell> cells = result.listCells(); - Collections.sort(cells, new Comparator<Cell>() { - - @Override - public int compare(Cell o1, Cell o2) { - int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp()); - if (c != 0) return c; - c = o1.getTypeByte() - o2.getTypeByte(); - if (c != 0) return c; - c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength()); - if (c != 0) return c; - return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength()); - } - - }); - int i = 0; - int nCells = cells.size(); - Result oldResult = null, newResult; - long readPtr = phoenixTransactionContext.getReadPointer(); - do { - boolean hasPuts = false; - LinkedList<Cell> singleTimeCells = Lists.newLinkedList(); - long writePtr; - Cell cell = cells.get(i); - do { - hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode(); - writePtr = cell.getTimestamp(); - ListIterator<Cell> it = singleTimeCells.listIterator(); - do { - // Add at the beginning of the list to match the expected HBase - // newest to oldest sort order (which TxTableState relies on - // with the Result.getLatestColumnValue() calls). However, we - // still want to add Cells in the expected order for each time - // bound as otherwise we won't find it in our old state. - it.add(cell); - } while (++i < nCells && (cell = cells.get(i)).getTimestamp() == writePtr); - } while (i < nCells && cell.getTimestamp() <= readPtr); - - // Generate point delete markers for the prior row deletion of the old index value. - // The write timestamp is the next timestamp, not the current timestamp, - // as the earliest cells are the current values for the row (and we don't - // want to delete the current row). - if (oldResult != null) { - TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, oldResult); - generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table); - } - // Generate point delete markers for the new index value. - // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not - // generate deletes. We would have generated the delete above based on the state - // of the previous row. The delete markers do not give us the state we need to - // delete. - if (hasPuts) { - newResult = Result.create(singleTimeCells); - // First row may represent the current state which we don't want to delete - if (writePtr > readPtr) { - TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, newResult); - generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table); - } - oldResult = newResult; - } else { - oldResult = null; - } - } while (i < nCells); - } - } - } - - private Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws IOException, SQLException { - if (maintainer.isRowDeleted(state.getPendingUpdate())) { - return Collections.emptyList(); - } - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - ptr.set(state.getCurrentRowKey()); - List<IndexUpdate> indexUpdates = Lists.newArrayList(); - Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), replyWrite, false, null); - ValueGetter valueGetter = statePair.getFirst(); - IndexUpdate indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.isLocalIndex() ? table.getName().getBytes() : maintainer.getIndexTableName()); - - byte[] regionStartKey = null; - byte[] regionEndkey = null; - if(maintainer.isLocalIndex()) { - HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), state.getCurrentRowKey()); - regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); - regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); - } - - Put put = maintainer.buildUpdateMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), regionStartKey, regionEndkey); - indexUpdate.setUpdate(put); - indexUpdates.add(indexUpdate); - - return indexUpdates; - } - - private Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws IOException, SQLException { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - ptr.set(state.getCurrentRowKey()); - List<IndexUpdate> indexUpdates = Lists.newArrayList(); - // For transactional tables, we use an index maintainer - // to aid in rollback if there's a KeyValue column in the index. The alternative would be - // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the - // client side. - Set<ColumnReference> cols = Sets.newHashSet(maintainer.getAllColumns()); - cols.add(new ColumnReference(maintainer.getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier())); - Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(cols, replyWrite, true, null); - ValueGetter valueGetter = statePair.getFirst(); - if (valueGetter!=null) { - IndexUpdate indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.isLocalIndex() ? table.getName().getBytes() : maintainer.getIndexTableName()); - - byte[] regionStartKey = null; - byte[] regionEndkey = null; - if(maintainer.isLocalIndex()) { - HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), state.getCurrentRowKey()); - regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); - regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); - } - - Delete delete = maintainer.buildDeleteMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), - state.getCurrentTimestamp(), regionStartKey, regionEndkey); - indexUpdate.setUpdate(delete); - indexUpdates.add(indexUpdate); - } - return indexUpdates; - } - - private void generateDeletes(Collection<Mutation> indexUpdates, - byte[] attribValue, - TxTableState state, - IndexMaintainer maintainer, - boolean replyWrite, - final PTable table) throws IOException, SQLException { - Iterable<IndexUpdate> deletes = getIndexDeletes(state, maintainer, replyWrite, table); - for (IndexUpdate delete : deletes) { - if (delete.isValid()) { - delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); - indexUpdates.add(delete.getUpdate()); - } - } - } - - private boolean generatePuts(Collection<Mutation> indexUpdates, - TxTableState state, - IndexMaintainer maintainer, - boolean replyWrite, - final PTable table) throws IOException, SQLException { - state.applyMutation(); - Iterable<IndexUpdate> puts = getIndexUpserts(state, maintainer, replyWrite, table); - boolean validPut = false; - for (IndexUpdate put : puts) { - if (put.isValid()) { - indexUpdates.add(put.getUpdate()); - validPut = true; - } - } - return validPut; - } - - - private static class TxTableState implements TableState { - private final Mutation mutation; - private final long currentTimestamp; - private final List<KeyValue> pendingUpdates; - private final Set<ColumnReference> indexedColumns; - private final Map<ColumnReference, ImmutableBytesWritable> valueMap; - - private TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation mutation) { - this.currentTimestamp = currentTimestamp; - this.indexedColumns = indexedColumns; - this.mutation = mutation; - int estimatedSize = indexedColumns.size(); - this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize); - this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize); - try { - CellScanner scanner = mutation.cellScanner(); - while (scanner.advance()) { - Cell cell = scanner.current(); - pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell)); - } - } catch (IOException e) { - throw new RuntimeException(e); // Impossible - } - } - - public TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) { - this(indexedColumns, currentTimestamp, m); - - for (ColumnReference ref : indexedColumns) { - Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); - if (cell != null) { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - valueMap.put(ref, ptr); - } - } - } - - @Override - public RegionCoprocessorEnvironment getEnvironment() { - return null; - } - - @Override - public long getCurrentTimestamp() { - return currentTimestamp; - } - - @Override - public byte[] getCurrentRowKey() { - return mutation.getRow(); - } - - @Override - public List<? extends IndexedColumnGroup> getIndexColumnHints() { - return Collections.emptyList(); - } - - private void applyMutation() { - for (Cell cell : pendingUpdates) { - if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { - ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - valueMap.remove(ref); - } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { - for (ColumnReference ref : indexedColumns) { - if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) { - valueMap.remove(ref); - } - } - } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){ - ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - if (indexedColumns.contains(ref)) { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - valueMap.put(ref, ptr); - } - } else { - throw new IllegalStateException("Unexpected mutation type for " + cell); - } - } - } - - @Override - public Collection<KeyValue> getPendingUpdate() { - return pendingUpdates; - } - - @Override - public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData) - throws IOException { - // TODO: creating these objects over and over again is wasteful - ColumnTracker tracker = new ColumnTracker(indexedColumns); - ValueGetter getter = new ValueGetter() { - - @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { - return valueMap.get(ref); - } - - @Override - public byte[] getRowKey() { - return mutation.getRow(); - } - - }; - Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker)); - return pair; - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index f8195f1..c7dbff2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -86,7 +86,6 @@ import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter; import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache; import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; -import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index a2edd45..f13e97a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -58,9 +58,7 @@ public abstract class BaseIndexBuilder implements IndexBuilder { Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]); meth.setAccessible(true); this.codec = meth.newInstance(); - this.codec.initialize(env); - } catch (IOException e) { - throw e; + this.codec.initialize(conf, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey(), env.getRegion().getRegionInfo().getTable().getName()); } catch (Exception e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java index cf6e95e..7489a8c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java @@ -20,16 +20,9 @@ package org.apache.phoenix.hbase.index.builder; import java.io.IOException; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.phoenix.hbase.index.covered.IndexCodec; public abstract class BaseIndexCodec implements IndexCodec { - - @Override - public void initialize(RegionCoprocessorEnvironment env) throws IOException { - // noop - } - /** * {@inheritDoc} * <p> http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1827f24/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java index e6d683e..7dde941 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java @@ -11,9 +11,9 @@ package org.apache.phoenix.hbase.index.covered; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; /** @@ -24,16 +24,6 @@ import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; */ public interface IndexCodec { /** - * Do any code initialization necessary - * - * @param env - * environment in which the codec is operating - * @throws IOException - * if the codec cannot be initalized correctly - */ - public void initialize(RegionCoprocessorEnvironment env) throws IOException; - - /** * Get the index cleanup entries. Currently, this must return just single row deletes (where just the row-key is * specified and no columns are returned) mapped to the table name. For instance, to you have an index 'myIndex' * with row : @@ -89,4 +79,6 @@ public interface IndexCodec { * @throws IOException */ public boolean isEnabled(Mutation m) throws IOException; + + public void initialize(Configuration conf, byte[] startKey, byte[] endKey, byte[] tableName); } \ No newline at end of file