Repository: phoenix Updated Branches: refs/heads/txn 16dd8ca15 -> d905a6662
Fix index maintenance for transactional tables Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d905a666 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d905a666 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d905a666 Branch: refs/heads/txn Commit: d905a6662801a104b33228f184de4dfa9158bad0 Parents: 16dd8ca Author: James Taylor <[email protected]> Authored: Wed May 13 11:07:35 2015 -0700 Committer: James Taylor <[email protected]> Committed: Wed May 13 11:07:35 2015 -0700 ---------------------------------------------------------------------- .../end2end/index/BaseMutableIndexIT.java | 6 ++- .../apache/phoenix/index/IndexMaintainer.java | 10 +++-- .../index/PhoenixTransactionalIndexer.java | 39 +++++++++++++++++++- 3 files changed, 49 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d905a666/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java index b2f8630..c6aadca 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java @@ -1094,6 +1094,7 @@ public abstract class BaseMutableIndexIT extends BaseHBaseManagedTimeIT { } } + @Test public void testUpsertingNullForIndexedColumns() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -1107,11 +1108,12 @@ public abstract class BaseMutableIndexIT extends BaseHBaseManagedTimeIT { //create a row with value null for indexed column v2 stmt.executeUpdate("upsert into DEMO values('cc1', null, 'abc')"); conn.commit(); - + //assert values in index table ResultSet rs = stmt.executeQuery("select * from DEMO_IDX"); assertTrue(rs.next()); assertEquals(0, Doubles.compare(0, rs.getDouble(1))); + assertTrue(rs.wasNull()); assertEquals("cc1", rs.getString(2)); assertEquals("abc", rs.getString(3)); assertFalse(rs.next()); @@ -1121,6 +1123,7 @@ public abstract class BaseMutableIndexIT extends BaseHBaseManagedTimeIT { assertTrue(rs.next()); assertEquals("cc1", rs.getString(1)); assertEquals(0, Doubles.compare(0, rs.getDouble(2))); + assertTrue(rs.wasNull()); assertEquals("abc", rs.getString(3)); assertFalse(rs.next()); @@ -1152,6 +1155,7 @@ public abstract class BaseMutableIndexIT extends BaseHBaseManagedTimeIT { rs = stmt.executeQuery("select * from DEMO_IDX"); assertTrue(rs.next()); assertEquals(0, Doubles.compare(0, rs.getDouble(1))); + assertTrue(rs.wasNull()); assertEquals("cc1", rs.getString(2)); assertEquals("abc", rs.getString(3)); assertFalse(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d905a666/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 3e1cb9c..1be0aa3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -845,15 +845,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // Delete the entire row if any of the indexed columns changed DeleteType deleteType = null; if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row - Delete delete; + byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary(); + Delete delete = new Delete(indexRowKey); // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { - delete = new Delete(indexRowKey); for (ColumnReference ref : getAllColumns()) { // FIXME: Keep Set<byte[]> for index CFs? delete.deleteFamilyVersion(ref.getFamily(), ts); } + delete.deleteFamilyVersion(emptyCF, ts); } else { - delete = new Delete(indexRowKey, ts); + for (ColumnReference ref : getAllColumns()) { // FIXME: Keep Set<byte[]> for index CFs? + delete.deleteFamily(ref.getFamily(), ts); + } + delete.deleteFamily(emptyCF, ts); } delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); return delete; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d905a666/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 38d6fd1..6f1e28c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -37,7 +37,9 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.hbase.index.MultiMutation; @@ -50,6 +52,8 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; @@ -132,6 +136,29 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { } } + private static final String TX_NO_READ_OWN_WRITES = "TX_NO_READ_OWN_WRITES"; + @Override + public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) { + /* + * TODO: remove once Tephra gives us a way to not read our own writes. + * Hack to force scan not to read their own writes. Since the mutations have already been + * applied by the time the preBatchMutate hook is called, we need to adjust the max time + * range down by one to prevent us from seeing the current state. Instead, we need to + * see the state right before our Puts have been applied. + */ + byte[] encoded = scan.getAttribute(TX_NO_READ_OWN_WRITES); + if (encoded != null) { + TimeRange range = scan.getTimeRange(); + long maxTime = range.getMax(); + try { + scan.setTimeRange(range.getMin(), maxTime == Long.MAX_VALUE ? maxTime : maxTime-1); + } catch (IOException e1) { + throw new RuntimeException(e1); + } + } + return s; + } + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { // Collect the set of mutable ColumnReferences so that we can first // run a scan to get the current state. We'll need this to delete @@ -174,6 +201,13 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); } Scan scan = new Scan(); + scan.setAttribute(TX_NO_READ_OWN_WRITES, PDataType.TRUE_BYTES); // TODO: remove when Tephra allows this + // Project all mutable columns + for (ColumnReference ref : mutableColumns) { + scan.addColumn(ref.getFamily(), ref.getQualifier()); + } + // Project empty key value column + scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); scanRanges.initializeScan(scan); scan.setFilter(scanRanges.getSkipScanFilter()); @@ -190,8 +224,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m, result); Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); for (IndexUpdate delete : deletes) { - if (delete.isValid()) - indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); + if (delete.isValid()) { + indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); + } } state.applyMutation(); Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
