Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.0 39cebc2ca -> 643436459
PHOENIX-3128 Remove extraneous operations during upsert with local immutable index Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5d3552fd Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5d3552fd Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5d3552fd Branch: refs/heads/4.x-HBase-1.0 Commit: 5d3552fd8bff8b7d0dcc4a265eda9ea955cad4c5 Parents: 39cebc2 Author: James Taylor <jamestay...@apache.org> Authored: Tue Aug 2 12:03:33 2016 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Tue Aug 2 16:59:24 2016 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/DistinctPrefixFilterIT.java | 11 ++- .../apache/phoenix/end2end/index/IndexIT.java | 39 +++++++++ .../compile/PostLocalIndexDDLCompiler.java | 15 ++-- .../apache/phoenix/execute/MutationState.java | 11 ++- .../hbase/index/covered/IndexMetaData.java | 9 +- .../hbase/index/covered/NonTxIndexBuilder.java | 6 +- .../phoenix/index/PhoenixIndexMetaData.java | 12 ++- .../index/PhoenixTransactionalIndexer.java | 88 +++++++++++++------- .../org/apache/phoenix/schema/PTableImpl.java | 13 ++- .../apache/phoenix/util/TransactionUtil.java | 7 ++ 10 files changed, 164 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java index 203d51e..9d31070 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java @@ -88,6 +88,15 @@ public class DistinctPrefixFilterIT extends BaseHBaseManagedTimeTableReuseIT { insertPrefixV("3", "1"); insertPrefixV("3", "2"); insertPrefixV("3", "3"); + conn.commit(); + ResultSet rs; + rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ count(*) from " + testTableV); + assertTrue(rs.next()); + long count1 = rs.getLong(1); + rs = conn.createStatement().executeQuery("select count(*) from " + testTableV + "_idx"); + assertTrue(rs.next()); + long count2 = rs.getLong(1); + assertEquals(count1,count2); multiply(); multiply(); @@ -258,7 +267,7 @@ public class DistinctPrefixFilterIT extends BaseHBaseManagedTimeTableReuseIT { testCommonDistinct(testTableF); testCommonDistinct(testTableV); -} + } private void testCommonDistinct(String testTable) throws Exception { testSkipRange("SELECT %s DISTINCT prefix1 FROM " + testTable, 4); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java index 35a0aad..df45ecb 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.math.BigDecimal; import java.sql.Connection; import java.sql.Date; @@ -39,7 +40,15 @@ import java.util.Collection; import java.util.Map; import java.util.Properties; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.coprocessor.generated.PTableProtos.PTableType; @@ -56,6 +65,7 @@ import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -63,6 +73,7 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; +import org.apache.phoenix.util.TransactionUtil; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -871,6 +882,7 @@ public class IndexIT extends BaseHBaseManagedTimeTableReuseIT { conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, \"V1\" VARCHAR, \"v2\" VARCHAR)"+tableDDLOptions); query = "SELECT * FROM "+fullTableName; rs = conn.createStatement().executeQuery(query); + long ts = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,fullTableName)).getTimeStamp(); assertFalse(rs.next()); conn.createStatement().execute( "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "(\"v2\") INCLUDE (\"V1\")"); @@ -941,9 +953,36 @@ public class IndexIT extends BaseHBaseManagedTimeTableReuseIT { assertEquals("2",rs.getString(5)); assertEquals("2",rs.getString("v2")); assertFalse(rs.next()); + + assertNoIndexDeletes(conn, ts, fullIndexName); } } + private void assertNoIndexDeletes(Connection conn, long minTimestamp, String fullIndexName) throws IOException, SQLException { + if (!this.mutable) { + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + PTable index = pconn.getTable(new PTableKey(null, fullIndexName)); + byte[] physicalIndexTable = index.getPhysicalName().getBytes(); + try (HTableInterface hIndex = pconn.getQueryServices().getTable(physicalIndexTable)) { + Scan scan = new Scan(); + scan.setRaw(true); + if (this.transactional) { + minTimestamp = TransactionUtil.convertToNanoseconds(minTimestamp); + } + scan.setTimeRange(minTimestamp, HConstants.LATEST_TIMESTAMP); + ResultScanner scanner = hIndex.getScanner(scan); + Result result; + while ((result = scanner.next()) != null) { + CellScanner cellScanner = result.cellScanner(); + while (cellScanner.advance()) { + Cell current = cellScanner.current(); + assertEquals (KeyValue.Type.Put.getCode(), current.getTypeByte()); + } + } + }; + } + } + @Test public void testInFilterOnIndexedTable() throws Exception { String tableName = "TBL_" + generateRandomString(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java index 079ff5c..81dbe0d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.compile; -import java.io.IOException; import java.sql.SQLException; import java.util.List; @@ -34,9 +33,9 @@ import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; -import org.apache.phoenix.util.ScanUtil; import com.google.common.collect.Lists; @@ -93,10 +92,14 @@ public class PostLocalIndexDDLCompiler { @Override public MutationState execute() throws SQLException { connection.getMutationState().commitDDLFence(dataTable); - Cell kv = plan.iterator().next().getValue(0); - ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); - // A single Cell will be returned with the count(*) - we decode that here - long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault()); + Tuple tuple = plan.iterator().next(); + long rowCount = 0; + if (tuple != null) { + Cell kv = tuple.getValue(0); + ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); + // A single Cell will be returned with the count(*) - we decode that here + rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault()); + } // The contract is to return a MutationState that contains the number of rows modified. In this // case, it's the number of rows in the data table which corresponds to the number of index // rows that were added. http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index bf47ab6..d44b679 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -553,12 +553,15 @@ public class MutationState implements SQLCloseable { return ptr; } - private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp, boolean includeMutableIndexes, final boolean sendAll) { + private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, + final long timestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism - (table.isImmutableRows() || includeMutableIndexes) ? - IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : - Iterators.<PTable>emptyIterator(); + includeAllIndexes || table.isWALDisabled() ? // TODO: remove check for isWALDisabled once PHOENIX-3137 is fixed. + IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : + table.isImmutableRows() ? + IndexMaintainer.enabledGlobalIndexIterator(table.getIndexes().iterator()) : + Iterators.<PTable>emptyIterator(); final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java index ee25a40..5420013 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java @@ -18,5 +18,12 @@ package org.apache.phoenix.hbase.index.covered; public interface IndexMetaData { - public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {}; + public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() { + + @Override + public boolean isImmutableRows() { + return false; + }}; + + public boolean isImmutableRows(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index 27af40f..10d164b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -98,7 +98,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { Collection<Batch> batches = createTimestampBatchesFromMutation(m); // go through each batch of keyvalues and build separate index entries for each - boolean cleanupCurrentState = true; + boolean cleanupCurrentState = !indexMetaData.isImmutableRows(); for (Batch batch : batches) { /* * We have to split the work between the cleanup and the update for each group because when we update the @@ -215,7 +215,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { // determine if we need to make any cleanup given the pending update. long batchTs = batch.getTimestamp(); state.setPendingUpdates(batch.getKvs()); - addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData); + if (!indexMetaData.isImmutableRows()) { + addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData); + } // A.2 do a single pass first for the updates to the current state state.applyPendingUpdates(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 7a67b9c..2679f1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -36,13 +36,13 @@ import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ServerUtil; - import org.apache.tephra.Transaction; public class PhoenixIndexMetaData implements IndexMetaData { private final Map<String, byte[]> attributes; private final IndexMetaDataCache indexMetaDataCache; private final boolean ignoreNewerMutations; + private final boolean isImmutable; private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException { if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } @@ -87,6 +87,11 @@ public class PhoenixIndexMetaData implements IndexMetaData { public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException { this.indexMetaDataCache = getIndexMetaData(env, attributes); + boolean isImmutable = true; + for (IndexMaintainer maintainer : indexMetaDataCache.getIndexMaintainers()) { + isImmutable &= maintainer.isImmutableRows(); + } + this.isImmutable = isImmutable; this.attributes = attributes; this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null; } @@ -106,4 +111,9 @@ public class PhoenixIndexMetaData implements IndexMetaData { public boolean ignoreNewerMutations() { return ignoreNewerMutations; } + + @Override + public boolean isImmutableRows() { + return isImmutable; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 331d00b..e499e29 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -67,7 +67,6 @@ import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; -import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; @@ -75,10 +74,10 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; -import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.TransactionUtil; import org.apache.tephra.Transaction; import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TxConstants; @@ -236,48 +235,68 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } } + private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) { + MultiMutation stored = mutations.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutations.put(row, stored); + } + stored.addAll(m); + } + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException { + Transaction tx = indexMetaData.getTransaction(); + if (tx == null) { + throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString()); + } + boolean isRollback = txRollbackAttribute!=null; + boolean isImmutable = indexMetaData.isImmutableRows(); ResultScanner currentScanner = null; TransactionAwareHTable txTable = null; // Collect up all mutations in batch Map<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); + Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations; + if (isImmutable && !isRollback) { + findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); + } else { + findPriorValueMutations = mutations; + } while(mutationIterator.hasNext()) { Mutation m = mutationIterator.next(); // add the mutation to the batch set ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); - MultiMutation stored = mutations.get(row); - // we haven't seen this row before, so add it - if (stored == null) { - stored = new MultiMutation(row); - mutations.put(row, stored); + if (mutations != findPriorValueMutations && isDeleteMutation(m)) { + addMutation(findPriorValueMutations, row, m); } - stored.addAll(m); + addMutation(mutations, row, m); } // Collect the set of mutable ColumnReferences so that we can first // run a scan to get the current state. We'll need this to delete // the existing index rows. - Transaction tx = indexMetaData.getTransaction(); - if (tx == null) { - throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString()); - } List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); - Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10); + int estimatedSize = indexMaintainers.size() * 10; + Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize); for (IndexMaintainer indexMaintainer : indexMaintainers) { // For transactional tables, we use an index maintainer // to aid in rollback if there's a KeyValue column in the index. The alternative would be // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the // client side. - mutableColumns.addAll(indexMaintainer.getAllColumns()); + Set<ColumnReference> allColumns = indexMaintainer.getAllColumns(); + mutableColumns.addAll(allColumns); } - boolean isRollback = txRollbackAttribute!=null; Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size()); try { - if (!mutableColumns.isEmpty()) { + // Track if we have row keys with Delete mutations (or Puts that are + // Tephra's Delete marker). If there are none, we don't need to do the scan for + // prior versions, if there are, we do. Since rollbacks always have delete mutations, + // this logic will work there too. + if (!findPriorValueMutations.isEmpty()) { List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size()); - for (ImmutableBytesPtr ptr : mutations.keySet()) { + for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) { keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); } Scan scan = new Scan(); @@ -305,9 +324,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { currentScanner = txTable.getScanner(scan); } if (isRollback) { - processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates); + processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations); } else { - processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates); + processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations, findPriorValueMutations); } } finally { if (txTable != null) txTable.close(); @@ -316,26 +335,39 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return indexUpdates; } + private static boolean isDeleteMutation(Mutation m) { + for (Map.Entry<byte[],List<Cell>> cellMap : m.getFamilyCellMap().entrySet()) { + for (Cell cell : cellMap.getValue()) { + if (cell.getTypeByte() != KeyValue.Type.Put.getCode() || TransactionUtil.isDelete(cell)) { + return true; + } + } + } + return false; + } + private void processMutation(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, ResultScanner scanner, - Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx, - Set<ColumnReference> mutableColumns, - Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException { + Transaction tx, + Set<ColumnReference> upsertColumns, + Collection<Pair<Mutation, byte[]>> indexUpdates, + Map<ImmutableBytesPtr, MultiMutation> mutations, + Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException { if (scanner != null) { Result result; ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); // Process existing data table rows by removing the old index row and adding the new index row while ((result = scanner.next()) != null) { - Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); - TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result); + Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); + TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result); generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); generatePuts(indexMetaData, indexUpdates, state); } } // Process new data table by adding new index rows for (Mutation m : mutations.values()) { - TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); + TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m); generatePuts(indexMetaData, indexUpdates, state); } } @@ -343,9 +375,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private void processRollback(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, ResultScanner scanner, - Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx, - Set<ColumnReference> mutableColumns, - Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException { + Transaction tx, Set<ColumnReference> mutableColumns, + Collection<Pair<Mutation, byte[]>> indexUpdates, + Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException { if (scanner != null) { Result result; // Loop through last committed row state plus all new rows associated with current transaction http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index ec09992..847979a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -834,12 +834,17 @@ public class PTableImpl implements PTable { // we're using the Tephra column family delete marker here to prevent the translation // of deletes to puts by the Tephra's TransactionProcessor if (PTableImpl.this.isTransactional()) { - Put delete = new Put(key); - for (PColumnFamily colFamily : families) { - delete.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts, + Put put = new Put(key); + if (families.isEmpty()) { + put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TxConstants.FAMILY_DELETE_QUALIFIER, ts, HConstants.EMPTY_BYTE_ARRAY); + } else { + for (PColumnFamily colFamily : families) { + put.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts, + HConstants.EMPTY_BYTE_ARRAY); + } } - deleteRow = delete; + deleteRow = put; } else { Delete delete = new Delete(key); for (PColumnFamily colFamily : families) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/5d3552fd/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index a477842..2425033 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -19,6 +19,9 @@ package org.apache.phoenix.util; import java.sql.SQLException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; @@ -36,6 +39,10 @@ public class TransactionUtil { private TransactionUtil() { } + public static boolean isDelete(Cell cell) { + return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY)); + } + public static long convertToNanoseconds(long serverTimeStamp) { return serverTimeStamp * TxConstants.MAX_TX_PER_MS; }