[ 
https://issues.apache.org/jira/browse/PHOENIX-1674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15023376#comment-15023376
 ] 

ASF GitHub Bot commented on PHOENIX-1674:
-----------------------------------------

Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/133#discussion_r45678756
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 ---
    @@ -0,0 +1,497 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
    + * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
    + * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
    + * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License for the specific language
    + * governing permissions and limitations under the License.
    + */
    +package org.apache.phoenix.index;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.CellScanner;
    +import org.apache.hadoop.hbase.CoprocessorEnvironment;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.KeyValueUtil;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HTableInterface;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.ResultScanner;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    +import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.hadoop.hbase.util.Pair;
    +import org.apache.htrace.Span;
    +import org.apache.htrace.Trace;
    +import org.apache.htrace.TraceScope;
    +import org.apache.phoenix.compile.ScanRanges;
    +import org.apache.phoenix.hbase.index.MultiMutation;
    +import org.apache.phoenix.hbase.index.ValueGetter;
    +import org.apache.phoenix.hbase.index.covered.IndexUpdate;
    +import org.apache.phoenix.hbase.index.covered.TableState;
    +import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
    +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
    +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
    +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
    +import org.apache.phoenix.hbase.index.write.IndexWriter;
    +import org.apache.phoenix.query.KeyRange;
    +import org.apache.phoenix.query.QueryConstants;
    +import org.apache.phoenix.schema.types.PVarbinary;
    +import org.apache.phoenix.trace.TracingUtils;
    +import org.apache.phoenix.trace.util.NullSpan;
    +import org.apache.phoenix.util.ScanUtil;
    +import org.apache.phoenix.util.SchemaUtil;
    +import org.apache.phoenix.util.ServerUtil;
    +
    +import co.cask.tephra.Transaction;
    +import co.cask.tephra.Transaction.VisibilityLevel;
    +import co.cask.tephra.TxConstants;
    +import co.cask.tephra.hbase11.TransactionAwareHTable;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import com.google.common.primitives.Longs;
    +
    +/**
    + * Do all the work of managing index updates for a transactional table 
from a single coprocessor. Since the transaction
    + * manager essentially time orders writes through conflict detection, the 
logic to maintain a secondary index is quite a
    + * bit simpler than the non transactional case. For example, there's no 
need to muck with the WAL, as failure scenarios
    + * are handled by aborting the transaction.
    + */
    +public class PhoenixTransactionalIndexer extends BaseRegionObserver {
    +
    +    private static final Log LOG = 
LogFactory.getLog(PhoenixTransactionalIndexer.class);
    +
    +    private PhoenixIndexCodec codec;
    +    private IndexWriter writer;
    +    private boolean stopped;
    +
    +    @Override
    +    public void start(CoprocessorEnvironment e) throws IOException {
    +        final RegionCoprocessorEnvironment env = 
(RegionCoprocessorEnvironment)e;
    +        String serverName = 
env.getRegionServerServices().getServerName().getServerName();
    +        codec = new PhoenixIndexCodec();
    +        codec.initialize(env);
    +
    +        // setup the actual index writer
    +        this.writer = new IndexWriter(env, serverName + 
"-tx-index-writer");
    +    }
    +
    +    @Override
    +    public void stop(CoprocessorEnvironment e) throws IOException {
    +        if (this.stopped) { return; }
    +        this.stopped = true;
    +        String msg = "TxIndexer is being stopped";
    +        this.writer.stop(msg);
    +    }
    +
    +    private static Iterator<Mutation> getMutationIterator(final 
MiniBatchOperationInProgress<Mutation> miniBatchOp) {
    +        return new Iterator<Mutation>() {
    +            private int i = 0;
    +            
    +            @Override
    +            public boolean hasNext() {
    +                return i < miniBatchOp.size();
    +            }
    +
    +            @Override
    +            public Mutation next() {
    +                return miniBatchOp.getOperation(i++);
    +            }
    +
    +            @Override
    +            public void remove() {
    +                throw new UnsupportedOperationException();
    +            }
    +            
    +        };
    +    }
    +    @Override
    +    public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
    +            MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
    +
    +        Mutation m = miniBatchOp.getOperation(0);
    +        if (!codec.isEnabled(m)) {
    +            super.preBatchMutate(c, miniBatchOp);
    +            return;
    +        }
    +
    +        Map<String,byte[]> updateAttributes = m.getAttributesMap();
    +        PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
    +        byte[] txRollbackAttribute = 
m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
    +        Collection<Pair<Mutation, byte[]>> indexUpdates = null;
    +        // get the current span, or just use a null-span to avoid a bunch 
of if statements
    +        try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
    +            Span current = scope.getSpan();
    +            if (current == null) {
    +                current = NullSpan.INSTANCE;
    +            }
    +
    +            // get the index updates for all elements in this batch
    +            indexUpdates = getIndexUpdates(c.getEnvironment(), 
indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
    +
    +            current.addTimelineAnnotation("Built index updates, doing 
preStep");
    +            TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
    +
    +            // no index updates, so we are done
    +            if (!indexUpdates.isEmpty()) {
    +                this.writer.write(indexUpdates);
    +            }
    +        } catch (Throwable t) {
    +            String msg = "Failed to update index with entries:" + 
indexUpdates;
    +            LOG.error(msg, t);
    +            ServerUtil.throwIOException(msg, t);
    +        }
    +    }
    +
    +    private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData 
indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) 
throws IOException {
    +        ResultScanner currentScanner = null;
    +        TransactionAwareHTable txTable = null;
    +        // Collect up all mutations in batch
    +        Map<ImmutableBytesPtr, MultiMutation> mutations =
    +                new HashMap<ImmutableBytesPtr, MultiMutation>();
    +        while(mutationIterator.hasNext()) {
    +            Mutation m = mutationIterator.next();
    +            // add the mutation to the batch set
    +            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
    +            MultiMutation stored = mutations.get(row);
    +            // we haven't seen this row before, so add it
    +            if (stored == null) {
    +                stored = new MultiMutation(row);
    +                mutations.put(row, stored);
    +            }
    +            stored.addAll(m);
    +        }
    +        
    +        // Collect the set of mutable ColumnReferences so that we can first
    +        // run a scan to get the current state. We'll need this to delete
    +        // the existing index rows.
    +        Transaction tx = indexMetaData.getTransaction();
    +        assert(tx != null);
    +        List<IndexMaintainer> indexMaintainers = 
indexMetaData.getIndexMaintainers();
    +        Set<ColumnReference> mutableColumns = 
Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
    +        for (IndexMaintainer indexMaintainer : indexMaintainers) {
    +            // For transactional tables, we use an index maintainer
    +            // to aid in rollback if there's a KeyValue column in the 
index. The alternative would be
    +            // to hold on to all uncommitted index row keys (even ones 
already sent to HBase) on the
    +            // client side.
    +            mutableColumns.addAll(indexMaintainer.getAllColumns());
    +        }
    +
    +        boolean isRollback = txRollbackAttribute!=null;
    +        Collection<Pair<Mutation, byte[]>> indexUpdates = new 
ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * 
indexMaintainers.size());
    +        try {
    +            if (!mutableColumns.isEmpty()) {
    +                List<KeyRange> keys = 
Lists.newArrayListWithExpectedSize(mutations.size());
    +                for (ImmutableBytesPtr ptr : mutations.keySet()) {
    +                    
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
    +                }
    +                Scan scan = new Scan();
    +                // Project all mutable columns
    +                for (ColumnReference ref : mutableColumns) {
    +                    scan.addColumn(ref.getFamily(), ref.getQualifier());
    +                }
    +                // Project empty key value column
    +                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
    +                ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
    +                scanRanges.initializeScan(scan);
    +                scan.setFilter(scanRanges.getSkipScanFilter());
    +                TableName tableName = 
env.getRegion().getRegionInfo().getTable();
    +                HTableInterface htable = env.getTable(tableName);
    +                txTable = new TransactionAwareHTable(htable);
    +                txTable.startTx(tx);
    +                // For rollback, we need to see all versions, including
    +                // the last committed version as there may be multiple
    +                // checkpointed versions.
    +                if (isRollback) {
    +                    tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL);
    +                }
    +                currentScanner = txTable.getScanner(scan);
    +            }
    +            if (isRollback) {
    +                processRollback(env, indexMetaData, txRollbackAttribute, 
currentScanner, mutations, tx, mutableColumns, indexUpdates);
    +            } else {
    +                processMutation(env, indexMetaData, txRollbackAttribute, 
currentScanner, mutations, tx, mutableColumns, indexUpdates);
    +            }
    +        } finally {
    +            if (txTable != null) txTable.close();
    +        }
    +        
    +        return indexUpdates;
    +    }
    +
    +    private void processMutation(RegionCoprocessorEnvironment env,
    +            PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
    +            ResultScanner scanner,
    +            Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction 
tx,
    +            Set<ColumnReference> mutableColumns,
    +            Collection<Pair<Mutation, byte[]>> indexUpdates) throws 
IOException {
    +        if (scanner != null) {
    +            Result result;
    +            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
    +            // Process existing data table rows by removing the old index 
row and adding the new index row
    +            while ((result = scanner.next()) != null) {
    +                Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
    +                TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
    +                generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
    +                generatePuts(indexMetaData, indexUpdates, state);
    +            }
    +        }
    +        // Process new data table by adding new index rows
    +        for (Mutation m : mutations.values()) {
    +            TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m);
    +            generatePuts(indexMetaData, indexUpdates, state);
    +        }
    +    }
    +
    +    private void processRollback(RegionCoprocessorEnvironment env,
    +            PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
    +            ResultScanner scanner,
    +            Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction 
tx,
    +            Set<ColumnReference> mutableColumns,
    +            Collection<Pair<Mutation, byte[]>> indexUpdates) throws 
IOException {
    +        if (scanner != null) {
    +            Result result;
    +            // Loop through last committed row state plus all new rows 
associated with current transaction
    +            // to generate point delete markers for all index rows that 
were added. We don't have Tephra
    +            // manage index rows in change sets because we don't want to 
be hit with the additional
    +            // memory hit and do not need to do conflict detection on 
index rows.
    +            ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
    +            while ((result = scanner.next()) != null) {
    +                Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
    +                // Sort by timestamp, type, cf, cq so we can process in 
time batches from oldest to newest
    +                // (as if we're "replaying" them in time order).
    +                List<Cell> cells = result.listCells();
    +                Collections.sort(cells, new Comparator<Cell>() {
    +
    +                    @Override
    +                    public int compare(Cell o1, Cell o2) {
    +                        int c = Longs.compare(o1.getTimestamp(), 
o2.getTimestamp());
    +                        if (c != 0) return c;
    +                        c = o1.getTypeByte() - o2.getTypeByte();
    +                        if (c != 0) return c;
    +                        c = Bytes.compareTo(o1.getFamilyArray(), 
o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), 
o1.getFamilyOffset(), o1.getFamilyLength());
    +                        if (c != 0) return c;
    +                        return Bytes.compareTo(o1.getQualifierArray(), 
o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), 
o1.getQualifierOffset(), o1.getQualifierLength());
    +                    }
    +                    
    +                });
    +                int i = 0;
    +                int nCells = cells.size();
    +                Result oldResult = null, newResult;
    +                long readPtr = tx.getReadPointer();
    +                do {
    +                    boolean hasPuts = false;
    +                    LinkedList<Cell> singleTimeCells = 
Lists.newLinkedList();
    +                    long writePtr;
    +                    do {
    +                        Cell cell = cells.get(i);
    +                        hasPuts |= cell.getTypeByte() == 
KeyValue.Type.Put.getCode();
    +                        writePtr = cell.getTimestamp();
    +                        do {
    +                            // Add at the beginning of the list to match 
the expected HBase
    +                            // newest to oldest sort order (which 
TxTableState relies on
    +                            // with the Result.getLatestColumnValue() 
calls).
    +                            singleTimeCells.addFirst(cell);
    +                        } while (++i < nCells && 
cells.get(i).getTimestamp() == writePtr);
    +                    } while (i < nCells && cells.get(i).getTimestamp() <= 
readPtr);
    +                    
    +                    // Generate point delete markers for the prior row 
deletion of the old index value.
    +                    // The write timestamp is the next timestamp, not the 
current timestamp,
    +                    // as the earliest cells are the current values for 
the row (and we don't
    +                    // want to delete the current row).
    +                    if (oldResult != null) {
    +                        TxTableState state = new TxTableState(env, 
mutableColumns, indexMetaData.getAttributes(), writePtr, m, emptyColRef, 
oldResult);
    +                        generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
    +                    }
    +                    // Generate point delete markers for the new index 
value.
    +                    // If our time batch doesn't have Puts (i.e. we have 
only Deletes), then do not
    +                    // generate deletes. We would have generated the 
delete above based on the state
    +                    // of the previous row. The delete markers do not give 
us the state we need to
    +                    // delete.
    +                    if (hasPuts) {
    +                        newResult = Result.create(singleTimeCells);
    +                        // First row may represent the current state which 
we don't want to delete
    +                        if (writePtr > readPtr) {
    +                            TxTableState state = new TxTableState(env, 
mutableColumns, indexMetaData.getAttributes(), writePtr, m, emptyColRef, 
newResult);
    +                            generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
    +                        }
    +                        oldResult = newResult;
    +                    } else {
    +                        oldResult = null;
    +                    }
    +                } while (i < nCells);
    +            }
    +        }
    +    }
    +
    +    private void generateDeletes(PhoenixIndexMetaData indexMetaData,
    +            Collection<Pair<Mutation, byte[]>> indexUpdates,
    +            byte[] attribValue, TxTableState state) throws IOException {
    +        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData);
    +        for (IndexUpdate delete : deletes) {
    +            if (delete.isValid()) {
    +                
delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
attribValue);
    +                indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
    +            }
    +        }
    +    }
    +
    +    boolean generatePuts(
    --- End diff --
    
    Make private if possible


> Snapshot isolation transaction support through Tephra
> -----------------------------------------------------
>
>                 Key: PHOENIX-1674
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-1674
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: James Taylor
>              Labels: SFDC
>
> Tephra (http://tephra.io/ and https://github.com/caskdata/tephra) is one 
> option for getting transaction support in Phoenix. Let's use this JIRA to 
> discuss the way in which this could be integrated along with the pros and 
> cons.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to