PHOENIX-4773 Move HTable rollback wrapper into Tephra TAL method
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0718a87b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0718a87b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0718a87b Branch: refs/heads/4.x-cdh5.15 Commit: 0718a87b7f6e9cf1c65fc28141c276ca2713a499 Parents: 4f0929f Author: James Taylor <jamestay...@apache.org> Authored: Tue Jun 5 04:27:36 2018 +0100 Committer: Pedro Boado <pbo...@apache.org> Committed: Wed Oct 17 21:25:04 2018 +0100 ---------------------------------------------------------------------- .../apache/phoenix/cache/ServerCacheClient.java | 21 +- .../apache/phoenix/execute/HashJoinPlan.java | 7 +- .../apache/phoenix/execute/MutationState.java | 190 ++----------------- .../PhoenixTxIndexMutationGenerator.java | 42 ++++ .../phoenix/index/IndexMetaDataCacheClient.java | 67 ++++++- .../apache/phoenix/join/HashCacheClient.java | 5 +- .../transaction/OmidTransactionContext.java | 3 +- .../transaction/PhoenixTransactionContext.java | 5 +- .../transaction/TephraTransactionContext.java | 91 ++++++++- .../java/org/apache/phoenix/util/IndexUtil.java | 8 + 10 files changed, 230 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 68de747..5e284bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -70,7 +70,6 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; @@ -90,7 +89,7 @@ public class ServerCacheClient { private static final Random RANDOM = new Random(); public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER = "hash.join.server.cache.resend.per.server"; private final PhoenixConnection connection; - private final Map<Integer, TableRef> cacheUsingTableRefMap = new ConcurrentHashMap<Integer, TableRef>(); + private final Map<Integer, PTable> cacheUsingTableMap = new ConcurrentHashMap<Integer, PTable>(); /** * Construct client used to create a serialized cached snapshot of a table and send it to each region server @@ -220,12 +219,12 @@ public class ServerCacheClient { } public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, - final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException { - return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTableRef, false); + final ServerCacheFactory cacheFactory, final PTable cacheUsingTable) throws SQLException { + return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false); } public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, - final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef, boolean storeCacheOnClient) + final ServerCacheFactory cacheFactory, final PTable cacheUsingTable, boolean storeCacheOnClient) throws SQLException { ConnectionQueryServices services = connection.getQueryServices(); List<Closeable> closeables = new ArrayList<Closeable>(); @@ -241,7 +240,6 @@ public class ServerCacheClient { ExecutorService executor = services.getExecutor(); List<Future<Boolean>> futures = Collections.emptyList(); try { - final PTable cacheUsingTable = cacheUsingTableRef.getTable(); List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes()); int nRegions = locations.size(); // Size these based on worst case @@ -258,7 +256,7 @@ public class ServerCacheClient { servers.add(entry); if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, connection));} final byte[] key = getKeyInRegion(entry.getRegionInfo().getStartKey()); - final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes()); + final HTableInterface htable = services.getTable(cacheUsingTable.getPhysicalName().getBytes()); closeables.add(htable); futures.add(executor.submit(new JobCallable<Boolean>() { @@ -294,7 +292,7 @@ public class ServerCacheClient { future.get(timeoutMs, TimeUnit.MILLISECONDS); } - cacheUsingTableRefMap.put(Bytes.mapKey(cacheId), cacheUsingTableRef); + cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable); success = true; } catch (SQLException e) { firstException = e; @@ -337,9 +335,8 @@ public class ServerCacheClient { try { ConnectionQueryServices services = connection.getQueryServices(); Throwable lastThrowable = null; - TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId)); - final PTable cacheUsingTable = cacheUsingTableRef.getTable(); - byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes(); + final PTable cacheUsingTable = cacheUsingTableMap.get(Bytes.mapKey(cacheId)); + byte[] tableName = cacheUsingTable.getPhysicalName().getBytes(); iterateOverTable = services.getTable(tableName); List<HRegionLocation> locations = services.getAllTableRegions(tableName); @@ -403,7 +400,7 @@ public class ServerCacheClient { lastThrowable); } } finally { - cacheUsingTableRefMap.remove(cacheId); + cacheUsingTableMap.remove(cacheId); Closeables.closeQuietly(iterateOverTable); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 5b433b3..bfe089d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -66,7 +66,10 @@ import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.optimize.Cost; -import org.apache.phoenix.parse.*; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.ParseNode; +import org.apache.phoenix.parse.SQLParser; +import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -494,7 +497,7 @@ public class HashJoinPlan extends DelegateQueryPlan { cache = parent.hashClient.addHashCache(ranges, iterator, plan.getEstimatedSize(), hashExpressions, singleValueOnly, - parent.delegate.getTableRef(), keyRangeRhsExpression, + parent.delegate.getTableRef().getTable(), keyRangeRhsExpression, keyRangeRhsValues); long endTime = System.currentTimeMillis(); boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime); http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 2e795b1..c29d6b5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -38,7 +38,6 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.Immutable; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -51,7 +50,6 @@ import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -60,10 +58,8 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexBuilder; -import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; -import org.apache.phoenix.index.PhoenixIndexMetaData; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.monitoring.GlobalClientMetrics; @@ -94,14 +90,10 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.transaction.TransactionFactory.Provider; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; -import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; -import org.apache.phoenix.util.SQLCloseables; -import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TransactionUtil; @@ -499,19 +491,13 @@ public class MutationState implements SQLCloseable { return ptr; } - private static List<PTable> getClientMaintainedIndexes(PTable table) { - Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism - (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table - .getIndexes().iterator()) : Collections.<PTable> emptyIterator(); - return Lists.newArrayList(indexIterator); - } - private Iterator<Pair<PName, List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); - final List<PTable> indexList = includeAllIndexes ? Lists.newArrayList(IndexMaintainer.maintainedIndexes(table - .getIndexes().iterator())) : getClientMaintainedIndexes(table); + final List<PTable> indexList = includeAllIndexes ? + Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator())) : + IndexUtil.getClientMaintainedIndexes(table); final Iterator<PTable> indexes = indexList.iterator(); final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists @@ -541,7 +527,7 @@ public class MutationState implements SQLCloseable { if (!mutationsPertainingToIndex.isEmpty()) { if (table.isTransactional()) { if (indexMutationsMap == null) { - PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, + PhoenixTxIndexMutationGenerator generator = PhoenixTxIndexMutationGenerator.newGenerator(connection, table, indexList, mutationsPertainingToIndex.get(0).getAttributesMap()); try (HTableInterface htable = connection.getQueryServices().getTable( table.getPhysicalName().getBytes())) { @@ -596,43 +582,6 @@ public class MutationState implements SQLCloseable { }; } - private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List<PTable> indexes, - Map<String, byte[]> attributes) { - final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size()); - for (PTable index : indexes) { - IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); - indexMaintainers.add(maintainer); - } - IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() { - - @Override - public void close() throws IOException {} - - @Override - public List<IndexMaintainer> getIndexMaintainers() { - return indexMaintainers; - } - - @Override - public PhoenixTransactionContext getTransactionContext() { - return phoenixTransactionContext.newTransactionContext(phoenixTransactionContext, true); - } - - @Override - public int getClientVersion() { - return MetaDataProtocol.PHOENIX_VERSION; - } - - }; - try { - PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes); - return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, - table.getPhysicalName().getBytes()); - } catch (IOException e) { - throw new RuntimeException(e); // Impossible - } - } - private void generateMutations(final TableRef tableRef, final long mutationTimestamp, final long serverTimestamp, final MultiRowMutationState values, final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) { @@ -793,7 +742,6 @@ public class MutationState implements SQLCloseable { private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException { - Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long serverTimeStamp = tableRef.getTimeStamp(); // If we're auto committing, we've already validated the schema when we got the ColumnResolver, @@ -862,65 +810,6 @@ public class MutationState implements SQLCloseable { return batchCount; } - private class MetaDataAwareHTable extends DelegateHTable { - private final TableRef tableRef; - - private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) { - super(delegate); - this.tableRef = tableRef; - } - - /** - * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an opportunity to attach - * our index meta data to the mutations such that we can also undo the index mutations. - */ - @Override - public void delete(List<Delete> deletes) throws IOException { - ServerCache cache = null; - try { - if (deletes.isEmpty()) { return; } - // Attach meta data for server maintained indexes - PTable table = tableRef.getTable(); - ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); - if (table.getIndexMaintainers(indexMetaDataPtr, connection)) { - cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); - } - - // Send deletes for client maintained indexes - List<PTable> indexes = getClientMaintainedIndexes(table); - if (!indexes.isEmpty()) { - PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexes, deletes.get(0) - .getAttributesMap()); - Collection<Pair<Mutation, byte[]>> indexUpdates = generator.getIndexUpdates(delegate, - deletes.iterator()); - for (PTable index : indexes) { - byte[] physicalName = index.getPhysicalName().getBytes(); - try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName)) { - List<Mutation> indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size()); - for (Pair<Mutation, byte[]> mutationPair : indexUpdates) { - if (Bytes.equals(mutationPair.getSecond(), physicalName)) { - indexDeletes.add(mutationPair.getFirst()); - } - hindex.batch(indexDeletes); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - } - - delegate.delete(deletes); - } catch (SQLException e) { - throw new IOException(e); - } finally { - if (cache != null) { - SQLCloseables.closeAllQuietly(Collections.singletonList(cache)); - } - } - } - } - private static class TableInfo { private final boolean isDataTable; @@ -1059,8 +948,9 @@ public class MutationState implements SQLCloseable { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); table.getIndexMaintainers(indexMetaDataPtr, connection); - final ServerCache cache = tableInfo.isDataTable() ? setMetaDataOnMutations(origTableRef, - mutationList, indexMetaDataPtr) : null; + final ServerCache cache = tableInfo.isDataTable() ? + IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table, + mutationList, indexMetaDataPtr) : null; // If we haven't retried yet, retry for this case only, as it's possible that // a split will occur after we send the index metadata cache to all known // region servers. @@ -1070,19 +960,12 @@ public class MutationState implements SQLCloseable { try { if (table.isTransactional()) { // Track tables to which we've sent uncommitted data - uncommittedPhysicalNames.add(table.getPhysicalName().getString()); - phoenixTransactionContext.markDMLFence(table); - - // If we have indexes, wrap the HTable in a delegate HTable that - // will attach the necessary index meta data in the event of a - // rollback - if (!table.getIndexes().isEmpty()) { - hTable = new MetaDataAwareHTable(hTable, origTableRef); + if (tableInfo.isDataTable()) { + uncommittedPhysicalNames.add(table.getPhysicalName().getString()); + phoenixTransactionContext.markDMLFence(table); } - - hTable = phoenixTransactionContext.getTransactionalTableWriter(hTable, table); + hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, !tableInfo.isDataTable()); } - numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); mutationSizeBytes = calculateMutationSize(mutationList); @@ -1236,57 +1119,6 @@ public class MutationState implements SQLCloseable { return phoenixTransactionContext.encodeTransaction(); } - private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations, - ImmutableBytesWritable indexMetaDataPtr) throws SQLException { - PTable table = tableRef.getTable(); - final byte[] tenantIdBytes; - if (table.isMultiTenant()) { - tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes( - table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(), - table.getViewIndexId() != null); - } else { - tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); - } - ServerCache cache = null; - byte[] attribValue = null; - byte[] uuidValue = null; - byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; - if (table.isTransactional()) { - txState = encodeTransaction(); - } - boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0; - if (hasIndexMetaData) { - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() - + txState.length)) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); - uuidValue = cache.getId(); - } else { - attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); - uuidValue = ServerCacheClient.generateId(); - } - } else if (txState.length == 0) { return null; } - // Either set the UUID to be able to access the index metadata from the cache - // or set the index metadata directly on the Mutation - for (Mutation mutation : mutations) { - if (connection.getTenantId() != null) { - mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes); - } - mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - if (attribValue != null) { - mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); - mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, - Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION)); - if (txState.length > 0) { - mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - } - } else if (!hasIndexMetaData && txState.length > 0) { - mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - } - } - return cache; - } - private void addUncommittedStatementIndexes(Collection<RowMutationState> rowMutations) { for (RowMutationState rowMutationState : rowMutations) { uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes, http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java index 0016fa9..a7b5687 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java @@ -45,7 +45,9 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.MultiMutation; import org.apache.phoenix.hbase.index.ValueGetter; @@ -59,7 +61,9 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexMetaData; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; @@ -440,4 +444,42 @@ public class PhoenixTxIndexMutationGenerator { return pair; } } + + public static PhoenixTxIndexMutationGenerator newGenerator(final PhoenixConnection connection, PTable table, List<PTable> indexes, + Map<String, byte[]> attributes) { + final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size()); + for (PTable index : indexes) { + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + indexMaintainers.add(maintainer); + } + IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() { + + @Override + public void close() throws IOException {} + + @Override + public List<IndexMaintainer> getIndexMaintainers() { + return indexMaintainers; + } + + @Override + public PhoenixTransactionContext getTransactionContext() { + PhoenixTransactionContext context = connection.getMutationState().getPhoenixTransactionContext(); + return context.newTransactionContext(context, true); + } + + @Override + public int getClientVersion() { + return MetaDataProtocol.PHOENIX_VERSION; + } + + }; + try { + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes); + return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, + table.getPhysicalName().getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java index fcabdfd..bd308c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java @@ -24,20 +24,25 @@ import java.util.List; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.join.MaxServerCacheSizeExceededException; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.ScanUtil; public class IndexMetaDataCacheClient { private final ServerCacheClient serverCache; - private TableRef cacheUsingTableRef; + private PTable cacheUsingTable; /** * Construct client used to send index metadata to each region server @@ -45,9 +50,9 @@ public class IndexMetaDataCacheClient { * @param connection the client connection * @param cacheUsingTableRef table ref to table that will use the cache during its scan */ - public IndexMetaDataCacheClient(PhoenixConnection connection, TableRef cacheUsingTableRef) { + public IndexMetaDataCacheClient(PhoenixConnection connection, PTable cacheUsingTable) { serverCache = new ServerCacheClient(connection); - this.cacheUsingTableRef = cacheUsingTableRef; + this.cacheUsingTable = cacheUsingTable; } /** @@ -75,7 +80,7 @@ public class IndexMetaDataCacheClient { /** * Serialize and compress hashCacheTable */ - return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); + return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTable); } @@ -91,7 +96,55 @@ public class IndexMetaDataCacheClient { /** * Serialize and compress hashCacheTable */ - return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); + return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTable); + } + + public static ServerCache setMetaDataOnMutations(PhoenixConnection connection, PTable table, List<? extends Mutation> mutations, + ImmutableBytesWritable indexMetaDataPtr) throws SQLException { + final byte[] tenantIdBytes; + if (table.isMultiTenant()) { + tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes( + table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(), + table.getViewIndexId() != null); + } else { + tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); + } + ServerCache cache = null; + byte[] attribValue = null; + byte[] uuidValue = null; + byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY; + if (table.isTransactional()) { + txState = connection.getMutationState().encodeTransaction(); + } + boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0; + if (hasIndexMetaData) { + if (useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, table); + cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); + uuidValue = cache.getId(); + } else { + attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + uuidValue = ServerCacheClient.generateId(); + } + } else if (txState.length == 0) { return null; } + // Either set the UUID to be able to access the index metadata from the cache + // or set the index metadata directly on the Mutation + for (Mutation mutation : mutations) { + if (connection.getTenantId() != null) { + mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes); + } + mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + if (attribValue != null) { + mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); + mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, + Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION)); + if (txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } + } else if (!hasIndexMetaData && txState.length > 0) { + mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } + } + return cache; } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java index 2ec509c..83ac32d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java @@ -37,7 +37,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.ByteUtil; @@ -77,13 +76,13 @@ public class HashCacheClient { * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException { + public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, PTable cacheUsingTable, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException { /** * Serialize and compress hashCacheTable */ ImmutableBytesWritable ptr = new ImmutableBytesWritable(); serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues); - ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef, true); + ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTable, true); return cache; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index ab9e8a6..d235d4b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -20,6 +20,7 @@ package org.apache.phoenix.transaction; import java.sql.SQLException; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.transaction.TransactionFactory.Provider; @@ -132,7 +133,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext { } @Override - public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table) { + public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) { // TODO Auto-generated method stub return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index 751945a..dfa35be 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.transaction.TransactionFactory.Provider; @@ -110,7 +111,7 @@ public interface PhoenixTransactionContext { } @Override - public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table) { + public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) { return null; } }; @@ -230,5 +231,5 @@ public interface PhoenixTransactionContext { public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask); public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable); - public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table); + public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index 27a00d9..3999b7e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -20,22 +20,33 @@ package org.apache.phoenix.transaction; import java.io.IOException; import java.sql.SQLException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.DelegateHTable; +import org.apache.phoenix.execute.PhoenixTxIndexMutationGenerator; +import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SQLCloseables; import org.apache.tephra.Transaction; import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TransactionAware; @@ -492,14 +503,19 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } @Override - public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table) { - boolean isIndex = table.getType() == PTableType.INDEX; - TransactionAwareHTableDelegate transactionAwareHTable = new TransactionAwareHTableDelegate(htable, table.isImmutableRows() || isIndex ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) throws SQLException { + // If we have indexes, wrap the HTable in a delegate HTable that + // will attach the necessary index meta data in the event of a + // rollback + TransactionAwareHTableDelegate transactionAwareHTable; // Don't add immutable indexes (those are the only ones that would participate // during a commit), as we don't need conflict detection for these. if (isIndex) { + transactionAwareHTable = new TransactionAwareHTableDelegate(htable, TxConstants.ConflictDetection.NONE); transactionAwareHTable.startTx(getTransaction()); } else { + htable = new RollbackHookHTableWrapper(htable, table, connection); + transactionAwareHTable = new TransactionAwareHTableDelegate(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); // Even for immutable, we need to do this so that an abort has the state // necessary to generate the rows to delete. this.addTransactionAware(transactionAwareHTable); @@ -507,4 +523,73 @@ public class TephraTransactionContext implements PhoenixTransactionContext { return transactionAwareHTable; } + /** + * + * Wraps Tephra data table HTables to catch when a rollback occurs so + * that index Delete mutations can be generated and applied (as + * opposed to storing them in the Tephra change set). This technique + * allows the Tephra API to be used directly with HBase APIs and + * Phoenix APIs since we can detect the rollback as a callback + * when the Tephra rollback is called. + * + */ + private static class RollbackHookHTableWrapper extends DelegateHTable { + private final PTable table; + private final PhoenixConnection connection; + + private RollbackHookHTableWrapper(HTableInterface delegate, PTable table, PhoenixConnection connection) { + super(delegate); + this.table = table; + this.connection = connection; + } + + /** + * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an opportunity to attach + * our index meta data to the mutations such that we can also undo the index mutations. + */ + @Override + public void delete(List<Delete> deletes) throws IOException { + ServerCache cache = null; + try { + if (deletes.isEmpty()) { return; } + // Attach meta data for server maintained indexes + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + if (table.getIndexMaintainers(indexMetaDataPtr, connection)) { + cache = IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table, deletes, indexMetaDataPtr); + } + + // Send deletes for client maintained indexes + List<PTable> indexes = IndexUtil.getClientMaintainedIndexes(table); + if (!indexes.isEmpty()) { + PhoenixTxIndexMutationGenerator generator = PhoenixTxIndexMutationGenerator.newGenerator(connection, table, indexes, deletes.get(0) + .getAttributesMap()); + Collection<Pair<Mutation, byte[]>> indexUpdates = generator.getIndexUpdates(delegate, + deletes.iterator()); + for (PTable index : indexes) { + byte[] physicalName = index.getPhysicalName().getBytes(); + try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName)) { + List<Mutation> indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size()); + for (Pair<Mutation, byte[]> mutationPair : indexUpdates) { + if (Bytes.equals(mutationPair.getSecond(), physicalName)) { + indexDeletes.add(mutationPair.getFirst()); + } + hindex.batch(indexDeletes); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + } + + delegate.delete(deletes); + } catch (SQLException e) { + throw new IOException(e); + } finally { + if (cache != null) { + SQLCloseables.closeAllQuietly(Collections.singletonList(cache)); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0718a87b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 78a68d2..3fe5438 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -32,6 +32,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -816,5 +817,12 @@ public class IndexUtil { .setTableName(indexName).build().buildException(); } } + + public static List<PTable> getClientMaintainedIndexes(PTable table) { + Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism + (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table + .getIndexes().iterator()) : Collections.<PTable> emptyIterator(); + return Lists.newArrayList(indexIterator); + } }