Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 8b6d2ef41 -> 2bfd48ac6
PHOENIX-4099 Do not write table data again when replaying mutations for partial index rebuild Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2bfd48ac Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2bfd48ac Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2bfd48ac Branch: refs/heads/4.x-HBase-0.98 Commit: 2bfd48ac6d86d06f0510a27de663509a5be56413 Parents: 8b6d2ef Author: James Taylor <jamestay...@apache.org> Authored: Thu Aug 17 18:06:50 2017 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Thu Aug 17 22:27:43 2017 -0700 ---------------------------------------------------------------------- .../EndToEndCoveredColumnsIndexBuilderIT.java | 6 ++--- .../coprocessor/BaseScannerRegionObserver.java | 24 +++++++++++++++++++- .../UngroupedAggregateRegionObserver.java | 16 ++++++------- .../apache/phoenix/execute/MutationState.java | 5 +--- .../org/apache/phoenix/hbase/index/Indexer.java | 10 +++++++- .../hbase/index/builder/BaseIndexBuilder.java | 5 ++-- .../hbase/index/builder/IndexBuildManager.java | 5 ++-- .../hbase/index/builder/IndexBuilder.java | 3 ++- .../hbase/index/covered/IndexMetaData.java | 8 ++++--- .../covered/update/IndexUpdateManager.java | 2 +- .../phoenix/index/PhoenixIndexBuilder.java | 5 ++-- .../apache/phoenix/index/PhoenixIndexCodec.java | 4 ++-- .../phoenix/index/PhoenixIndexMetaData.java | 15 ++++++++---- .../index/PhoenixIndexPartialBuildMapper.java | 5 ++-- .../index/covered/LocalTableStateTest.java | 13 ++++++----- .../index/covered/NonTxIndexBuilderTest.java | 5 ++-- .../covered/update/TestIndexUpdateManager.java | 5 ++-- 17 files changed, 86 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java index 2358cfb..4f951a8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.hbase.index.IndexTestingUtils; import org.apache.phoenix.hbase.index.Indexer; @@ -51,7 +52,6 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.util.EnvironmentEdge; import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -150,8 +150,8 @@ public class EndToEndCoveredColumnsIndexBuilderIT { } @Override - public boolean ignoreNewerMutations() { - return false; + public ReplayWrite getReplayWrite() { + return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 65d73ea..8037251 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -42,6 +42,7 @@ import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory; import org.apache.phoenix.iterate.RegionScannerFactory; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; +import org.apache.phoenix.schema.types.PUnsignedTinyint; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -91,7 +92,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK"; public static final String TX_SCN = "_TxScn"; public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow"; - public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS"; + public static final String REPLAY_WRITES = "_IGNORE_NEWER_MUTATIONS"; public final static String SCAN_OFFSET = "_RowOffset"; public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix"; public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix"; @@ -102,6 +103,27 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = "_ImmutableStorageEncodingScheme"; public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = "_UseEncodedColumnQualifierList"; + public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1); + public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2); + + public enum ReplayWrite { + TABLE_AND_INDEX, + INDEX_ONLY; + + public static ReplayWrite fromBytes(byte[] replayWriteBytes) { + if (replayWriteBytes == null) { + return null; + } + if (Bytes.compareTo(REPLAY_TABLE_AND_INDEX_WRITES, replayWriteBytes) == 0) { + return TABLE_AND_INDEX; + } + if (Bytes.compareTo(REPLAY_ONLY_INDEX_WRITES, replayWriteBytes) == 0) { + return INDEX_ONLY; + } + throw new IllegalArgumentException("Unknown ReplayWrite code of " + Bytes.toStringBinary(replayWriteBytes)); + } + }; + /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations * are used to augment log lines emitted by Phoenix. See https://issues.apache.org/jira/browse/PHOENIX-1198. http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 853f054..16945ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -372,7 +372,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver RegionScanner theScanner = s; - boolean replayMutations = scan.getAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null; + byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES); byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); List<Expression> selectExpressions = null; @@ -602,8 +602,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Cell firstKV = results.get(0); Delete delete = new Delete(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(),ts); - if (replayMutations) { - delete.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + if (replayMutations != null) { + delete.setAttribute(REPLAY_WRITES, replayMutations); } mutations.add(delete); // force tephra to ignore this deletes @@ -656,8 +656,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } for (Mutation mutation : row.toRowMutations()) { - if (replayMutations) { - mutation.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + if (replayMutations != null) { + mutation.setAttribute(REPLAY_WRITES, replayMutations); } mutations.add(mutation); } @@ -899,8 +899,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver put = new Put(CellUtil.cloneRow(cell)); put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, - PDataType.TRUE_BYTES); + put.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); mutations.add(put); // Since we're replaying existing mutations, it makes no sense to write them to the wal put.setDurability(Durability.SKIP_WAL); @@ -911,8 +910,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver del = new Delete(CellUtil.cloneRow(cell)); del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, - PDataType.TRUE_BYTES); + del.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); mutations.add(del); // Since we're replaying existing mutations, it makes no sense to write them to the wal del.setDurability(Durability.SKIP_WAL); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 0ce163a..318e018 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -33,8 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.concurrent.Immutable; @@ -81,7 +79,6 @@ import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.transaction.PhoenixTransactionContext; @@ -593,7 +590,7 @@ public class MutationState implements SQLCloseable { // future dated data row mutations that will get in the way of generating the // correct index rows on replay. for (Mutation mutation : rowMutations) { - mutation.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_TABLE_AND_INDEX_WRITES); } } if (mutationsPertainingToIndex != null) mutationsPertainingToIndex http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 9368980..35dbe08 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; import org.apache.phoenix.hbase.index.builder.IndexBuildManager; import org.apache.phoenix.hbase.index.builder.IndexBuilder; @@ -367,6 +368,7 @@ public class Indexer extends BaseRegionObserver { } private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS); + private static final OperationStatus NOWRITE = new OperationStatus(OperationStatusCode.SUCCESS); private static final OperationStatus FAILURE = new OperationStatus(OperationStatusCode.FAILURE, "Unable to acquire row lock"); // Assume time stamp of mutation a client defined time stamp if it's not within @@ -465,7 +467,8 @@ public class Indexer extends BaseRegionObserver { } Mutation firstMutation = miniBatchOp.getOperation(0); - boolean resetTimeStamp = !this.builder.isPartialRebuild(firstMutation) && !isProbablyClientControlledTimeStamp(firstMutation); + ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation); + boolean resetTimeStamp = replayWrite == null && !isProbablyClientControlledTimeStamp(firstMutation); long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); for (int i = 0; i < miniBatchOp.size(); i++) { @@ -486,6 +489,11 @@ public class Indexer extends BaseRegionObserver { } } } + // No need to write the table mutations when we're rebuilding + // the index as they're already written and just being replayed. + if (replayWrite == ReplayWrite.INDEX_ONLY) { + miniBatchOp.setOperationStatus(i, NOWRITE); + } // Only copy mutations if we found duplicate rows // which only occurs when we're partially rebuilding http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index 21350d4..a2edd45 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.covered.IndexCodec; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; @@ -131,7 +132,7 @@ public abstract class BaseIndexBuilder implements IndexBuilder { } @Override - public boolean isPartialRebuild(Mutation m) { - return false; + public ReplayWrite getReplayWrite(Mutation m) { + return null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index f8fb421..4c410ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; @@ -133,7 +134,7 @@ public class IndexBuildManager implements Stoppable { return this.delegate; } - public boolean isPartialRebuild(Mutation m) throws IOException { - return this.delegate.isPartialRebuild(m); + public ReplayWrite getReplayWrite(Mutation m) throws IOException { + return this.delegate.getReplayWrite(m); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index e64a857..a00294c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; @@ -149,5 +150,5 @@ public interface IndexBuilder extends Stoppable { */ public List<Mutation> executeAtomicOp(Increment inc) throws IOException; - public boolean isPartialRebuild(Mutation m); + public ReplayWrite getReplayWrite(Mutation m); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java index 04e2523..5314631 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.hbase.index.covered; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; + public interface IndexMetaData { public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() { @@ -26,11 +28,11 @@ public interface IndexMetaData { } @Override - public boolean ignoreNewerMutations() { - return false; + public ReplayWrite getReplayWrite() { + return null; }}; public boolean isImmutableRows(); - public boolean ignoreNewerMutations(); + public ReplayWrite getReplayWrite(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java index 2784f0b..99234f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java @@ -120,7 +120,7 @@ public class IndexUpdateManager { updates = new TreeSet<Mutation>(COMPARATOR); map.put(key, updates); } - if (indexMetaData.ignoreNewerMutations()) { + if (indexMetaData.getReplayWrite() != null) { // if we're replaying mutations, we don't need to worry about out-of-order updates updates.add(m); } else { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index 2823268..a66cd2b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; import org.apache.phoenix.expression.Expression; @@ -382,7 +383,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { } @Override - public boolean isPartialRebuild(Mutation m) { - return PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap()); + public ReplayWrite getReplayWrite(Mutation m) { + return PhoenixIndexMetaData.getReplayWrite(m.getAttributesMap()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 1726b1f..ffb199a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -72,7 +72,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { ptr.set(state.getCurrentRowKey()); List<IndexUpdate> indexUpdates = Lists.newArrayList(); for (IndexMaintainer maintainer : indexMaintainers) { - Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations(), false, context); + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.getReplayWrite() != null, false, context); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion() @@ -99,7 +99,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { // client side. Set<ColumnReference> cols = Sets.newHashSet(maintainer.getAllColumns()); cols.add(new ColumnReference(indexMaintainers.get(0).getDataEmptyKeyValueCF(), indexMaintainers.get(0).getEmptyKeyValueQualifier())); - Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(cols, metaData.ignoreNewerMutations(), true, context); + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(cols, metaData.getReplayWrite() != null, true, context); ValueGetter valueGetter = statePair.getFirst(); if (valueGetter!=null) { IndexUpdate indexUpdate = statePair.getSecond(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 83201ba..7908103 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -28,6 +28,7 @@ import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.covered.IndexMetaData; @@ -40,7 +41,7 @@ import org.apache.phoenix.util.ServerUtil; public class PhoenixIndexMetaData implements IndexMetaData { private final Map<String, byte[]> attributes; private final IndexMetaDataCache indexMetaDataCache; - private final boolean ignoreNewerMutations; + private final ReplayWrite replayWrite; private final boolean isImmutable; private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException { @@ -91,7 +92,11 @@ public class PhoenixIndexMetaData implements IndexMetaData { } public static boolean isIndexRebuild(Map<String,byte[]> attributes) { - return attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null; + return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES) != null; + } + + public static ReplayWrite getReplayWrite(Map<String,byte[]> attributes) { + return ReplayWrite.fromBytes(attributes.get(BaseScannerRegionObserver.REPLAY_WRITES)); } public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException { @@ -102,7 +107,7 @@ public class PhoenixIndexMetaData implements IndexMetaData { } this.isImmutable = isImmutable; this.attributes = attributes; - this.ignoreNewerMutations = isIndexRebuild(attributes); + this.replayWrite = getReplayWrite(attributes); } public PhoenixTransactionContext getTransactionContext() { @@ -117,8 +122,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { return attributes; } - public boolean ignoreNewerMutations() { - return ignoreNewerMutations; + public ReplayWrite getReplayWrite() { + return replayWrite; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java index 54dc748..0ead358 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java @@ -45,7 +45,6 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.slf4j.Logger; @@ -114,7 +113,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr put = new Put(CellUtil.cloneRow(cell)); put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); put.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); - put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); mutations.add(put); } put.add(cell); @@ -123,7 +122,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr del = new Delete(CellUtil.cloneRow(cell)); del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); del.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); - del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); mutations.add(del); } del.addDeleteMarker(cell); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java index 7f3e1c4..e9c3da6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -61,8 +62,8 @@ public class LocalTableStateTest { } @Override - public boolean ignoreNewerMutations() { - return false; + public ReplayWrite getReplayWrite() { + return null; } }; @@ -124,8 +125,8 @@ public class LocalTableStateTest { } @Override - public boolean ignoreNewerMutations() { - return false; + public ReplayWrite getReplayWrite() { + return null; } }; @@ -161,8 +162,8 @@ public class LocalTableStateTest { } @Override - public boolean ignoreNewerMutations() { - return false; + public ReplayWrite getReplayWrite() { + return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java index 7c8575f..7dbed8b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.BaseRegionScanner; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.MultiMutation; import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; @@ -227,7 +228,7 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { public void testRebuildMultipleVersionRow() throws IOException { // when doing a rebuild, we are replaying mutations so we want to ignore newer mutations // see LocalTable#getCurrentRowState() - Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(true); + Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(ReplayWrite.INDEX_ONLY); // the current row state has 3 versions, but if we rebuild as of t=2, scanner in LocalTable // should only return first @@ -279,7 +280,7 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { public void testManyVersions() throws IOException { // when doing a rebuild, we are replaying mutations so we want to ignore newer mutations // see LocalTable#getCurrentRowState() - Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(true); + Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(ReplayWrite.INDEX_ONLY); MultiMutation mutation = getMultipleVersionMutation(200); currentRowCells = mutation.getFamilyCellMap().get(FAM); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bfd48ac/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java index 9e50615..b542368 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/update/TestIndexUpdateManager.java @@ -30,11 +30,10 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import org.apache.phoenix.hbase.index.covered.IndexMetaData; -import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; public class TestIndexUpdateManager { @@ -46,7 +45,7 @@ public class TestIndexUpdateManager { @Before public void setup() { mockIndexMetaData = Mockito.mock(IndexMetaData.class); - Mockito.when(mockIndexMetaData.ignoreNewerMutations()).thenReturn(false); + Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null); } @Test