PHOENIX-3789 Addendum to execute cross region index maintenance calls in postBatchMutateIndispensably
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ee886bab Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ee886bab Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ee886bab Branch: refs/heads/omid Commit: ee886bab91f58f4b54efc619d2ec3aec92d40a92 Parents: 8309b22 Author: James Taylor <[email protected]> Authored: Mon Apr 17 14:53:41 2017 -0700 Committer: James Taylor <[email protected]> Committed: Mon Apr 17 19:12:02 2017 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/hbase/index/Indexer.java | 39 ++++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ee886bab/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index f485bdf..de98051 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -372,7 +372,7 @@ public class Indexer extends BaseRegionObserver { super.postPut(e, put, edit, durability); return; } - doPost(edit, put, durability, true); + doPost(edit, put, durability, true, false); } @Override @@ -382,10 +382,29 @@ public class Indexer extends BaseRegionObserver { super.postDelete(e, delete, edit, durability); return; } - doPost(edit, delete, durability, true); + doPost(edit, delete, durability, true, false); } @Override + public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + if (this.disabled) { + super.postBatchMutate(c, miniBatchOp); + return; + } + WALEdit edit = miniBatchOp.getWalEdit(0); + if (edit != null) { + IndexedKeyValue ikv = getFirstIndexedKeyValue(edit); + if (ikv != null) { + // This will prevent the postPut and postDelete hooks from doing anything + // We need to do this now, as the postBatchMutateIndispensably (where the + // actual index writing gets done) is called after the postPut and postDelete. + ikv.markBatchFinished(); + } + } + } + + @Override public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException { if (this.disabled) { @@ -398,13 +417,17 @@ public class Indexer extends BaseRegionObserver { //each batch operation, only the first one will have anything useful, so we can just grab that Mutation mutation = miniBatchOp.getOperation(0); WALEdit edit = miniBatchOp.getWalEdit(0); - doPost(edit, mutation, mutation.getDurability(), false); + // We're forcing the index writes here because we've marked the index batch as "finished" + // to prevent postPut and postDelete from doing anything, but hold off on writing them + // until now so we're outside of the MVCC lock (see PHOENIX-3789). Without this hacky + // forceWrite flag, we'd ignore them again here too. + doPost(edit, mutation, mutation.getDurability(), false, true); } } - private void doPost(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates) throws IOException { + private void doPost(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite) throws IOException { try { - doPostWithExceptions(edit, m, durability, allowLocalUpdates); + doPostWithExceptions(edit, m, durability, allowLocalUpdates, forceWrite); return; } catch (Throwable e) { rethrowIndexingException(e); @@ -413,7 +436,7 @@ public class Indexer extends BaseRegionObserver { "Somehow didn't complete the index update, but didn't return succesfully either!"); } - private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates) + private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite) throws Exception { //short circuit, if we don't need to do any work if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) { @@ -447,14 +470,14 @@ public class Indexer extends BaseRegionObserver { * once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can * lead to writing all the index updates for each Put/Delete). */ - if (!ikv.getBatchFinished() || allowLocalUpdates) { + if ((!ikv.getBatchFinished() || forceWrite) || allowLocalUpdates) { Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit); // the WAL edit is kept in memory and we already specified the factory when we created the // references originally - therefore, we just pass in a null factory here and use the ones // already specified on each reference try { - if (!ikv.getBatchFinished()) { + if (!ikv.getBatchFinished() || forceWrite) { current.addTimelineAnnotation("Actually doing index update for first time"); writer.writeAndKillYourselfOnFailure(indexUpdates, allowLocalUpdates); } else if (allowLocalUpdates) {
