Github user JamesRTaylor commented on a diff in the pull request: https://github.com/apache/phoenix/pull/133#discussion_r45678935 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java --- @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.hbase.index.MultiMutation; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.IndexUpdate; +import org.apache.phoenix.hbase.index.covered.TableState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.trace.TracingUtils; +import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.ServerUtil; + +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.TxConstants; +import co.cask.tephra.hbase11.TransactionAwareHTable; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; + +/** + * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction + * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a + * bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios + * are handled by aborting the transaction. + */ +public class PhoenixTransactionalIndexer extends BaseRegionObserver { + + private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class); + + private PhoenixIndexCodec codec; + private IndexWriter writer; + private boolean stopped; + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e; + String serverName = env.getRegionServerServices().getServerName().getServerName(); + codec = new PhoenixIndexCodec(); + codec.initialize(env); + + // setup the actual index writer + this.writer = new IndexWriter(env, serverName + "-tx-index-writer"); + } + + @Override + public void stop(CoprocessorEnvironment e) throws IOException { + if (this.stopped) { return; } + this.stopped = true; + String msg = "TxIndexer is being stopped"; + this.writer.stop(msg); + } + + private static Iterator<Mutation> getMutationIterator(final MiniBatchOperationInProgress<Mutation> miniBatchOp) { + return new Iterator<Mutation>() { + private int i = 0; + + @Override + public boolean hasNext() { + return i < miniBatchOp.size(); + } + + @Override + public Mutation next() { + return miniBatchOp.getOperation(i++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + + Mutation m = miniBatchOp.getOperation(0); + if (!codec.isEnabled(m)) { + super.preBatchMutate(c, miniBatchOp); + return; + } + + Map<String,byte[]> updateAttributes = m.getAttributesMap(); + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); + byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY); + Collection<Pair<Mutation, byte[]>> indexUpdates = null; + // get the current span, or just use a null-span to avoid a bunch of if statements + try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { + Span current = scope.getSpan(); + if (current == null) { + current = NullSpan.INSTANCE; + } + + // get the index updates for all elements in this batch + indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute); + + current.addTimelineAnnotation("Built index updates, doing preStep"); + TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); + + // no index updates, so we are done + if (!indexUpdates.isEmpty()) { + this.writer.write(indexUpdates); + } + } catch (Throwable t) { + String msg = "Failed to update index with entries:" + indexUpdates; + LOG.error(msg, t); + ServerUtil.throwIOException(msg, t); + } + } + + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException { + ResultScanner currentScanner = null; + TransactionAwareHTable txTable = null; + // Collect up all mutations in batch + Map<ImmutableBytesPtr, MultiMutation> mutations = + new HashMap<ImmutableBytesPtr, MultiMutation>(); + while(mutationIterator.hasNext()) { + Mutation m = mutationIterator.next(); + // add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + MultiMutation stored = mutations.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutations.put(row, stored); + } + stored.addAll(m); + } + + // Collect the set of mutable ColumnReferences so that we can first + // run a scan to get the current state. We'll need this to delete + // the existing index rows. + Transaction tx = indexMetaData.getTransaction(); + assert(tx != null); + List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); + Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10); + for (IndexMaintainer indexMaintainer : indexMaintainers) { + // For transactional tables, we use an index maintainer + // to aid in rollback if there's a KeyValue column in the index. The alternative would be + // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the + // client side. + mutableColumns.addAll(indexMaintainer.getAllColumns()); + } + + boolean isRollback = txRollbackAttribute!=null; + Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size()); + try { + if (!mutableColumns.isEmpty()) { + List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size()); + for (ImmutableBytesPtr ptr : mutations.keySet()) { + keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); + } + Scan scan = new Scan(); + // Project all mutable columns + for (ColumnReference ref : mutableColumns) { + scan.addColumn(ref.getFamily(), ref.getQualifier()); + } + // Project empty key value column + scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); + scanRanges.initializeScan(scan); + scan.setFilter(scanRanges.getSkipScanFilter()); + TableName tableName = env.getRegion().getRegionInfo().getTable(); + HTableInterface htable = env.getTable(tableName); + txTable = new TransactionAwareHTable(htable); + txTable.startTx(tx); + // For rollback, we need to see all versions, including + // the last committed version as there may be multiple + // checkpointed versions. + if (isRollback) { + tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL); + } + currentScanner = txTable.getScanner(scan); + } + if (isRollback) { + processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates); + } else { + processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates); + } + } finally { + if (txTable != null) txTable.close(); + } + + return indexUpdates; + } + + private void processMutation(RegionCoprocessorEnvironment env, + PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, + ResultScanner scanner, + Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx, + Set<ColumnReference> mutableColumns, + Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException { + if (scanner != null) { + Result result; + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + // Process existing data table rows by removing the old index row and adding the new index row + while ((result = scanner.next()) != null) { + Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + generatePuts(indexMetaData, indexUpdates, state); + } + } + // Process new data table by adding new index rows + for (Mutation m : mutations.values()) { + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); + generatePuts(indexMetaData, indexUpdates, state); + } + } + + private void processRollback(RegionCoprocessorEnvironment env, + PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, + ResultScanner scanner, + Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx, + Set<ColumnReference> mutableColumns, + Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException { + if (scanner != null) { + Result result; + // Loop through last committed row state plus all new rows associated with current transaction + // to generate point delete markers for all index rows that were added. We don't have Tephra + // manage index rows in change sets because we don't want to be hit with the additional + // memory hit and do not need to do conflict detection on index rows. + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + while ((result = scanner.next()) != null) { + Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); + // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest + // (as if we're "replaying" them in time order). + List<Cell> cells = result.listCells(); + Collections.sort(cells, new Comparator<Cell>() { + + @Override + public int compare(Cell o1, Cell o2) { + int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp()); + if (c != 0) return c; + c = o1.getTypeByte() - o2.getTypeByte(); + if (c != 0) return c; + c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength()); + if (c != 0) return c; + return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength()); + } + + }); + int i = 0; + int nCells = cells.size(); + Result oldResult = null, newResult; + long readPtr = tx.getReadPointer(); + do { + boolean hasPuts = false; + LinkedList<Cell> singleTimeCells = Lists.newLinkedList(); + long writePtr; + do { + Cell cell = cells.get(i); + hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode(); + writePtr = cell.getTimestamp(); + do { + // Add at the beginning of the list to match the expected HBase + // newest to oldest sort order (which TxTableState relies on + // with the Result.getLatestColumnValue() calls). + singleTimeCells.addFirst(cell); + } while (++i < nCells && cells.get(i).getTimestamp() == writePtr); + } while (i < nCells && cells.get(i).getTimestamp() <= readPtr); + + // Generate point delete markers for the prior row deletion of the old index value. + // The write timestamp is the next timestamp, not the current timestamp, + // as the earliest cells are the current values for the row (and we don't + // want to delete the current row). + if (oldResult != null) { + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), writePtr, m, emptyColRef, oldResult); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + } + // Generate point delete markers for the new index value. + // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not + // generate deletes. We would have generated the delete above based on the state + // of the previous row. The delete markers do not give us the state we need to + // delete. + if (hasPuts) { + newResult = Result.create(singleTimeCells); + // First row may represent the current state which we don't want to delete + if (writePtr > readPtr) { + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), writePtr, m, emptyColRef, newResult); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + } + oldResult = newResult; + } else { + oldResult = null; + } + } while (i < nCells); + } + } + } + + private void generateDeletes(PhoenixIndexMetaData indexMetaData, + Collection<Pair<Mutation, byte[]>> indexUpdates, + byte[] attribValue, TxTableState state) throws IOException { + Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); + for (IndexUpdate delete : deletes) { + if (delete.isValid()) { + delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); + indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); + } + } + } + + boolean generatePuts( + PhoenixIndexMetaData indexMetaData, + Collection<Pair<Mutation, byte[]>> indexUpdates, + TxTableState state) + throws IOException { + state.applyMutation(); + Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); + boolean validPut = false; + for (IndexUpdate put : puts) { + if (put.isValid()) { + indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName())); + validPut = true; + } + } + return validPut; + } + + + private static class TxTableState implements TableState { + private final Mutation mutation; + private final long currentTimestamp; + private final RegionCoprocessorEnvironment env; + private final Map<String, byte[]> attributes; + private final List<KeyValue> pendingUpdates; + private final Set<ColumnReference> indexedColumns; + private final Map<ColumnReference, ImmutableBytesWritable> valueMap; + + private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation mutation) { + this.env = env; + this.currentTimestamp = currentTimestamp; + this.indexedColumns = indexedColumns; + this.attributes = attributes; + this.mutation = mutation; + int estimatedSize = indexedColumns.size(); + this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize); + this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize); + try { + CellScanner scanner = mutation.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell)); + } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + } + + public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) { + this(env, indexedColumns, attributes, currentTimestamp, m); + + for (ColumnReference ref : indexedColumns) { + Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); + if (cell != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } + } + + @Override + public RegionCoprocessorEnvironment getEnvironment() { + return env; + } + + @Override + public long getCurrentTimestamp() { + return currentTimestamp; + } + + @Override + public Map<String, byte[]> getUpdateAttributes() { + return attributes; + } + + @Override + public byte[] getCurrentRowKey() { + return mutation.getRow(); + } + + @Override + public List<? extends IndexedColumnGroup> getIndexColumnHints() { + return Collections.emptyList(); + } + + public void applyMutation() { --- End diff -- Can this be private too?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---