This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch PHOENIX-5748-4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-5748-4.x-HBase-1.5 by this push: new d6f939b PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebui… (#725) d6f939b is described below commit d6f939bc903480675825bea6c3b1b2be04bca599 Author: Swaroopa Kadam <swaroopa.kada...@gmail.com> AuthorDate: Tue Mar 17 12:24:38 2020 -0700 PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebui… (#725) PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebuildRegionScanner --- .../coprocessor/IndexRebuildRegionScanner.java | 304 ++-------- .../coprocessor/IndexToolVerificationResult.java | 304 ++++++++++ .../index/PhoenixIndexImportDirectReducer.java | 5 +- .../phoenix/index/VerifySingleIndexRowTest.java | 637 +++++++++++++++++++++ 4 files changed, 998 insertions(+), 252 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 6cb1145..ad549e5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -42,15 +42,16 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -109,236 +110,14 @@ import com.google.common.collect.Maps; public class IndexRebuildRegionScanner extends BaseRegionScanner { - public static class VerificationResult { - public static class PhaseResult { - private long validIndexRowCount = 0; - private long expiredIndexRowCount = 0; - private long missingIndexRowCount = 0; - private long invalidIndexRowCount = 0; - - public void add(PhaseResult phaseResult) { - validIndexRowCount += phaseResult.validIndexRowCount; - expiredIndexRowCount += phaseResult.expiredIndexRowCount; - missingIndexRowCount += phaseResult.missingIndexRowCount; - invalidIndexRowCount += phaseResult.invalidIndexRowCount; - } - - public long getTotalCount() { - return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount; - } - - @Override - public String toString() { - return "PhaseResult{" + - "validIndexRowCount=" + validIndexRowCount + - ", expiredIndexRowCount=" + expiredIndexRowCount + - ", missingIndexRowCount=" + missingIndexRowCount + - ", invalidIndexRowCount=" + invalidIndexRowCount + - '}'; - } - } - - private long scannedDataRowCount = 0; - private long rebuiltIndexRowCount = 0; - private PhaseResult before = new PhaseResult(); - private PhaseResult after = new PhaseResult(); - - @Override - public String toString() { - return "VerificationResult{" + - "scannedDataRowCount=" + scannedDataRowCount + - ", rebuiltIndexRowCount=" + rebuiltIndexRowCount + - ", before=" + before + - ", after=" + after + - '}'; - } - - public long getScannedDataRowCount() { - return scannedDataRowCount; - } - - public long getRebuiltIndexRowCount() { - return rebuiltIndexRowCount; - } - - public long getBeforeRebuildValidIndexRowCount() { - return before.validIndexRowCount; - } - - public long getBeforeRebuildExpiredIndexRowCount() { - return before.expiredIndexRowCount; - } - - public long getBeforeRebuildInvalidIndexRowCount() { - return before.invalidIndexRowCount; - } - - public long getBeforeRebuildMissingIndexRowCount() { - return before.missingIndexRowCount; - } - - public long getAfterRebuildValidIndexRowCount() { - return after.validIndexRowCount; - } - - public long getAfterRebuildExpiredIndexRowCount() { - return after.expiredIndexRowCount; - } - - public long getAfterRebuildInvalidIndexRowCount() { - return after.invalidIndexRowCount; - } - - public long getAfterRebuildMissingIndexRowCount() { - return after.missingIndexRowCount; - } - - private void addScannedDataRowCount(long count) { - this.scannedDataRowCount += count; - } - - private void addRebuiltIndexRowCount(long count) { - this.rebuiltIndexRowCount += count; - } - - private void addBeforeRebuildValidIndexRowCount(long count) { - before.validIndexRowCount += count; - } - - private void addBeforeRebuildExpiredIndexRowCount(long count) { - before.expiredIndexRowCount += count; - } - - private void addBeforeRebuildMissingIndexRowCount(long count) { - before.missingIndexRowCount += count; - } - - private void addBeforeRebuildInvalidIndexRowCount(long count) { - before.invalidIndexRowCount += count; - } - - private void addAfterRebuildValidIndexRowCount(long count) { - after.validIndexRowCount += count; - } - - private void addAfterRebuildExpiredIndexRowCount(long count) { - after.expiredIndexRowCount += count; - } - - private void addAfterRebuildMissingIndexRowCount(long count) { - after.missingIndexRowCount += count; - } - - private void addAfterRebuildInvalidIndexRowCount(long count) { - after.invalidIndexRowCount += count; - } - - private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) { - if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0, - AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) { - return true; - } - return false; - } - - private long getValue(Cell cell) { - return Long.parseLong(Bytes.toString(cell.getValueArray(), - cell.getValueOffset(), cell.getValueLength())); - } - - private void update(Cell cell) { - if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) { - addScannedDataRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) { - addRebuiltIndexRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) { - addBeforeRebuildValidIndexRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) { - addBeforeRebuildExpiredIndexRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) { - addBeforeRebuildMissingIndexRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) { - addBeforeRebuildInvalidIndexRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) { - addAfterRebuildValidIndexRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) { - addAfterRebuildExpiredIndexRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) { - addAfterRebuildMissingIndexRowCount(getValue(cell)); - } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) { - addAfterRebuildInvalidIndexRowCount(getValue(cell)); - } - } - - public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { - // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually. - // Search for the place where the trailing 0xFFs start - int offset = rowKeyPrefix.length; - while (offset > 0) { - if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { - break; - } - offset--; - } - if (offset == 0) { - // We got an 0xFFFF... (only FFs) stopRow value which is - // the last possible prefix before the end of the table. - // So set it to stop at the 'end of the table' - return HConstants.EMPTY_END_ROW; - } - // Copy the right length of the original - byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); - // And increment the last one - newStopRow[newStopRow.length - 1]++; - return newStopRow; - } - - public static VerificationResult getVerificationResult(Table hTable, long ts) - throws IOException { - VerificationResult verificationResult = new VerificationResult(); - byte[] startRowKey = Bytes.toBytes(Long.toString(ts)); - byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey); - Scan scan = new Scan(); - scan.setStartRow(startRowKey); - scan.setStopRow(stopRowKey); - ResultScanner scanner = hTable.getScanner(scan); - for (Result result = scanner.next(); result != null; result = scanner.next()) { - for (Cell cell : result.rawCells()) { - verificationResult.update(cell); - } - } - return verificationResult; - } - - public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) { - if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) { - return false; - } else if (verifyType == IndexTool.IndexVerifyType.ONLY) { - if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) { - return true; - } - } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) { - if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) { - return true; - } - } - return false; - } - - public void add(VerificationResult verificationResult) { - scannedDataRowCount += verificationResult.scannedDataRowCount; - rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount; - before.add(verificationResult.before); - after.add(verificationResult.after); - } - } - private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class); public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max"; private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17; public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max"; private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048; + public static final String NO_EXPECTED_MUTATION = "No expected mutation"; + public static final String + ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty"; private long pageSizeInRows = Long.MAX_VALUE; private int rowCountPerTask; private boolean hasMore; @@ -367,15 +146,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver; private RegionCoprocessorEnvironment env; private HTableFactory hTableFactory; - private int indexTableTTL; - private VerificationResult verificationResult; + private int indexTableTTL = 0; + private IndexToolVerificationResult verificationResult; private boolean isBeforeRebuilt = true; private boolean partialRebuild = false; private int singleRowRebuildReturnCode; private Map<byte[], NavigableSet<byte[]>> familyMap; private byte[][] viewConstants; - IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, + @VisibleForTesting + public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, final RegionCoprocessorEnvironment env, UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException { super(innerScanner); @@ -416,7 +196,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); if (valueBytes != null) { - verificationResult = new VerificationResult(); + verificationResult = new IndexToolVerificationResult(); verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes); if (verifyType != IndexTool.IndexVerifyType.NONE) { verify = true; @@ -552,6 +332,24 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return uuidValue; } + @VisibleForTesting + public int setIndexTableTTL(int ttl) { + indexTableTTL = ttl; + return 0; + } + + @VisibleForTesting + public int setIndexMaintainer(IndexMaintainer indexMaintainer) { + this.indexMaintainer = indexMaintainer; + return 0; + } + + @VisibleForTesting + public int setIndexKeyToMutationMap(Map<byte[], List<Mutation>> newTreeMap) { + this.indexKeyToMutationMap = newTreeMap; + return 0; + } + public static class SimpleValueGetter implements ValueGetter { final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); final Put put; @@ -578,7 +376,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } - private byte[] getIndexRowKey(final Put dataRow) throws IOException { + public byte[] getIndexRowKey(final Put dataRow) throws IOException { ValueGetter valueGetter = new SimpleValueGetter(dataRow); byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()), null, null, HConstants.LATEST_TIMESTAMP); @@ -594,14 +392,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return true; } - private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, + @VisibleForTesting + public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, String errorMsg) throws IOException { logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, null); } - private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, + @VisibleForTesting + public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException { final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:"); final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:"); @@ -794,7 +594,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return false; } - private static List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException { + @VisibleForTesting + public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException { Put put = null; Delete del = null; for (Cell cell : indexRow.rawCells()) { @@ -845,15 +646,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { * the data table mutation for which the delete marker is added. Thus, the timestamp of these delete markers will be * higher than the timestamp of index row to be deleted. */ - private boolean verifySingleIndexRow(Result indexRow, VerificationResult.PhaseResult verificationPhaseResult) + @VisibleForTesting + public boolean verifySingleIndexRow(Result indexRow, IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException { List<Mutation> expectedMutationList = indexKeyToMutationMap.get(indexRow.getRow()); if (expectedMutationList == null) { - throw new DoNotRetryIOException("No expected mutation"); + throw new DoNotRetryIOException(NO_EXPECTED_MUTATION); } List<Mutation> actualMutationList = prepareActualIndexMutations(indexRow); if (actualMutationList == null || actualMutationList.isEmpty()) { - throw new DoNotRetryIOException("actualMutationList is null or empty"); + throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY); } Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR); Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR); @@ -989,18 +791,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { Put put = pair.getFirst(); long ts1 = 0; if (put != null) { - ts1 = getMaxTimestamp((Mutation)put); + ts1 = getMaxTimestamp(put); } Delete del = pair.getSecond(); long ts2 = 0; if (del != null) { - ts1 = getMaxTimestamp((Mutation)del); + ts1 = getMaxTimestamp(del); } return (ts1 > ts2) ? ts1 : ts2; } private void verifyIndexRows(List<KeyRange> keys, - VerificationResult.PhaseResult verificationPhaseResult) throws IOException { + IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException { List<KeyRange> invalidKeys = new ArrayList<>(); ScanRanges scanRanges = ScanRanges.createPointLookup(keys); Scan indexScan = new Scan(); @@ -1063,7 +865,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } private void addVerifyTask(final List<KeyRange> keys, - final VerificationResult.PhaseResult verificationPhaseResult) { + final IndexToolVerificationResult.PhaseResult verificationPhaseResult) { tasks.add(new Task<Boolean>() { @Override public Boolean call() throws Exception { @@ -1081,14 +883,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { }); } - private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException { + private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException { int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / rowCountPerTask; tasks = new TaskBatch<>(taskCount); List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount); - List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount); + List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount); List<KeyRange> keys = new ArrayList<>(rowCountPerTask); listOfKeyRangeList.add(keys); - VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult(); + IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); verificationPhaseResultList.add(perTaskVerificationPhaseResult); for (byte[] indexKey: indexKeyToMutationMap.keySet()) { keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey)); @@ -1096,7 +898,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { addVerifyTask(keys, perTaskVerificationPhaseResult); keys = new ArrayList<>(rowCountPerTask); listOfKeyRangeList.add(keys); - perTaskVerificationPhaseResult = new VerificationResult.PhaseResult(); + perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); verificationPhaseResultList.add(perTaskVerificationPhaseResult); } } @@ -1118,7 +920,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { throw new IOException(exceptionMessage); } } - for (VerificationResult.PhaseResult result : verificationPhaseResultList) { + for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) { verificationPhaseResult.add(result); } if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) { @@ -1162,7 +964,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } private void verifyAndOrRebuildIndex() throws IOException { - VerificationResult nextVerificationResult = new VerificationResult(); + IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(); nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size(); if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) { // For these options we start with rebuilding index rows @@ -1175,7 +977,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { } if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.ONLY) { - VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult(); + IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); // For these options we start with verifying index rows parallelizeIndexVerify(verificationPhaseResult); nextVerificationResult.before.add(verificationPhaseResult); @@ -1200,7 +1002,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { // We have rebuilt index row and now we need to verify them - VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult(); + IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult(); indexKeyToMutationMap.clear(); for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) { prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond()); @@ -1450,7 +1252,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { return indexMutations; } - private void prepareIndexMutations(Put put, Delete del) throws IOException{ + @VisibleForTesting + public int prepareIndexMutations(Put put, Delete del) throws IOException { List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del); for (Mutation mutation : indexMutations) { byte[] indexRowKey = mutation.getRow(); @@ -1463,6 +1266,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner { mutationList.add(mutation); } } + return 0; } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java new file mode 100644 index 0000000..ed92fad --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.mapreduce.index.IndexTool; + +import java.io.IOException; +import java.util.Arrays; + +import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY; +import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES; + +public class IndexToolVerificationResult { + public static class PhaseResult { + long validIndexRowCount = 0; + long expiredIndexRowCount = 0; + long missingIndexRowCount = 0; + long invalidIndexRowCount = 0; + + public void add(PhaseResult phaseResult) { + validIndexRowCount += phaseResult.validIndexRowCount; + expiredIndexRowCount += phaseResult.expiredIndexRowCount; + missingIndexRowCount += phaseResult.missingIndexRowCount; + invalidIndexRowCount += phaseResult.invalidIndexRowCount; + } + + public PhaseResult(){} + + public PhaseResult(long validIndexRowCount, long expiredIndexRowCount, + long missingIndexRowCount, long invalidIndexRowCount) { + this.validIndexRowCount = validIndexRowCount; + this.expiredIndexRowCount = expiredIndexRowCount; + this.missingIndexRowCount = missingIndexRowCount; + this.invalidIndexRowCount = invalidIndexRowCount; + } + + public long getTotalCount() { + return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount; + } + + @Override + public String toString() { + return "PhaseResult{" + + "validIndexRowCount=" + validIndexRowCount + + ", expiredIndexRowCount=" + expiredIndexRowCount + + ", missingIndexRowCount=" + missingIndexRowCount + + ", invalidIndexRowCount=" + invalidIndexRowCount + + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (!(o instanceof PhaseResult)) { + return false; + } + PhaseResult pr = (PhaseResult) o; + return this.expiredIndexRowCount == pr.expiredIndexRowCount + && this.validIndexRowCount == pr.validIndexRowCount + && this.invalidIndexRowCount == pr.invalidIndexRowCount + && this.missingIndexRowCount == pr.missingIndexRowCount; + } + + @Override + public int hashCode() { + long result = 17; + result = 31 * result + expiredIndexRowCount; + result = 31 * result + validIndexRowCount; + result = 31 * result + missingIndexRowCount; + result = 31 * result + invalidIndexRowCount; + return (int)result; + } + } + + long scannedDataRowCount = 0; + long rebuiltIndexRowCount = 0; + PhaseResult before = new PhaseResult(); + PhaseResult after = new PhaseResult(); + + @Override + public String toString() { + return "VerificationResult{" + + "scannedDataRowCount=" + scannedDataRowCount + + ", rebuiltIndexRowCount=" + rebuiltIndexRowCount + + ", before=" + before + + ", after=" + after + + '}'; + } + + public long getScannedDataRowCount() { + return scannedDataRowCount; + } + + public long getRebuiltIndexRowCount() { + return rebuiltIndexRowCount; + } + + public long getBeforeRebuildValidIndexRowCount() { + return before.validIndexRowCount; + } + + public long getBeforeRebuildExpiredIndexRowCount() { + return before.expiredIndexRowCount; + } + + public long getBeforeRebuildInvalidIndexRowCount() { + return before.invalidIndexRowCount; + } + + public long getBeforeRebuildMissingIndexRowCount() { + return before.missingIndexRowCount; + } + + public long getAfterRebuildValidIndexRowCount() { + return after.validIndexRowCount; + } + + public long getAfterRebuildExpiredIndexRowCount() { + return after.expiredIndexRowCount; + } + + public long getAfterRebuildInvalidIndexRowCount() { + return after.invalidIndexRowCount; + } + + public long getAfterRebuildMissingIndexRowCount() { + return after.missingIndexRowCount; + } + + private void addScannedDataRowCount(long count) { + this.scannedDataRowCount += count; + } + + private void addRebuiltIndexRowCount(long count) { + this.rebuiltIndexRowCount += count; + } + + private void addBeforeRebuildValidIndexRowCount(long count) { + before.validIndexRowCount += count; + } + + private void addBeforeRebuildExpiredIndexRowCount(long count) { + before.expiredIndexRowCount += count; + } + + private void addBeforeRebuildMissingIndexRowCount(long count) { + before.missingIndexRowCount += count; + } + + private void addBeforeRebuildInvalidIndexRowCount(long count) { + before.invalidIndexRowCount += count; + } + + private void addAfterRebuildValidIndexRowCount(long count) { + after.validIndexRowCount += count; + } + + private void addAfterRebuildExpiredIndexRowCount(long count) { + after.expiredIndexRowCount += count; + } + + private void addAfterRebuildMissingIndexRowCount(long count) { + after.missingIndexRowCount += count; + } + + private void addAfterRebuildInvalidIndexRowCount(long count) { + after.invalidIndexRowCount += count; + } + + private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) { + if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0, + AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) { + return true; + } + return false; + } + + private long getValue(Cell cell) { + return Long.parseLong(Bytes.toString(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength())); + } + + private void update(Cell cell) { + if (CellUtil + .matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) { + addScannedDataRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) { + addRebuiltIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) { + addBeforeRebuildValidIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) { + addBeforeRebuildExpiredIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) { + addBeforeRebuildMissingIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) { + addBeforeRebuildInvalidIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) { + addAfterRebuildValidIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) { + addAfterRebuildExpiredIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) { + addAfterRebuildMissingIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) { + addAfterRebuildInvalidIndexRowCount(getValue(cell)); + } + } + + public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { + // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually. + // Search for the place where the trailing 0xFFs start + int offset = rowKeyPrefix.length; + while (offset > 0) { + if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { + break; + } + offset--; + } + if (offset == 0) { + // We got an 0xFFFF... (only FFs) stopRow value which is + // the last possible prefix before the end of the table. + // So set it to stop at the 'end of the table' + return HConstants.EMPTY_END_ROW; + } + // Copy the right length of the original + byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); + // And increment the last one + newStopRow[newStopRow.length - 1]++; + return newStopRow; + } + + public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts) + throws IOException { + IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(); + byte[] startRowKey = Bytes.toBytes(Long.toString(ts)); + byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey); + Scan scan = new Scan(); + scan.setStartRow(startRowKey); + scan.setStopRow(stopRowKey); + ResultScanner scanner = hTable.getScanner(scan); + for (Result result = scanner.next(); result != null; result = scanner.next()) { + for (Cell cell : result.rawCells()) { + verificationResult.update(cell); + } + } + return verificationResult; + } + + public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) { + if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) { + return false; + } else if (verifyType == IndexTool.IndexVerifyType.ONLY) { + if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) { + return true; + } + } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) { + if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) { + return true; + } + } + return false; + } + + public void add(IndexToolVerificationResult verificationResult) { + scannedDataRowCount += verificationResult.scannedDataRowCount; + rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount; + before.add(verificationResult.before); + after.add(verificationResult.after); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java index 98000f7..8d1b4db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; +import org.apache.phoenix.coprocessor.IndexToolVerificationResult; import org.apache.phoenix.coprocessor.TaskRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.util.ConnectionUtil; @@ -63,8 +64,8 @@ public class PhoenixIndexImportDirectReducer extends long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE)); Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices() .getTable(IndexTool.RESULT_TABLE_NAME_BYTES); - IndexRebuildRegionScanner.VerificationResult verificationResult = - IndexRebuildRegionScanner.VerificationResult.getVerificationResult(hTable, ts); + IndexToolVerificationResult verificationResult = + IndexToolVerificationResult.getVerificationResult(hTable, ts); context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT). setValue(verificationResult.getScannedDataRowCount()); context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT). diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java new file mode 100644 index 0000000..2506609 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java @@ -0,0 +1,637 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.index; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner; +import org.apache.phoenix.coprocessor.IndexToolVerificationResult; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.EnvironmentEdge; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Properties; + +import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; +import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest { + + private static final int INDEX_TABLE_EXPIRY_SEC = 1; + private static final String UNEXPECTED_COLUMN = "0:UNEXPECTED_COLUMN"; + public static final String FIRST_ID = "FIRST_ID"; + public static final String SECOND_ID = "SECOND_ID"; + public static final String FIRST_VALUE = "FIRST_VALUE"; + public static final String SECOND_VALUE = "SECOND_VALUE"; + public static final String + CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (FIRST_ID BIGINT NOT NULL, " + + "SECOND_ID BIGINT NOT NULL, FIRST_VALUE VARCHAR(20), " + + "SECOND_VALUE INTEGER " + + "CONSTRAINT PK PRIMARY KEY(FIRST_ID, SECOND_ID)) COLUMN_ENCODED_BYTES=0"; + + public static final String + CREATE_INDEX_DDL = "CREATE INDEX %s ON %s (SECOND_VALUE) INCLUDE (FIRST_VALUE)"; + public static final String COMPLETE_ROW_UPSERT = "UPSERT INTO %s VALUES (?,?,?,?)"; + public static final String PARTIAL_ROW_UPSERT = "UPSERT INTO %s (%s, %s, %s) VALUES (?,?,?)"; + public static final String DELETE_ROW_DML = "DELETE FROM %s WHERE %s = ? AND %s = ?"; + public static final String INCLUDED_COLUMN = "0:FIRST_VALUE"; + + @Rule + public ExpectedException exceptionRule = ExpectedException.none(); + + private enum TestType { + //set of mutations matching expected mutations + VALID_EXACT_MATCH, + //mix of delete and put mutations + VALID_MIX_MUTATIONS, + //only incoming unverified mutations + VALID_NEW_UNVERIFIED_MUTATIONS, + //extra mutations mimicking incoming mutations + VALID_MORE_MUTATIONS, + EXPIRED, + INVALID_EXTRA_CELL, + INVALID_EMPTY_CELL, + INVALID_CELL_VALUE, + INVALID_COLUMN + } + + public static class UnitTestClock extends EnvironmentEdge { + long initialTime; + long delta; + + public UnitTestClock(long delta) { + initialTime = System.currentTimeMillis() + delta; + this.delta = delta; + } + + @Override + public long currentTime() { + return System.currentTimeMillis() + delta; + } + } + + @Mock + Result indexRow; + @Mock + IndexRebuildRegionScanner rebuildScanner; + List<Mutation> actualMutationList; + String schema, table, dataTableFullName, index, indexTableFullName; + PTable pIndexTable, pDataTable; + Put put = null; + Delete delete = null; + PhoenixConnection pconn; + IndexToolVerificationResult.PhaseResult actualPR; + public Map<byte[], List<Mutation>> indexKeyToMutationMapLocal; + private IndexMaintainer indexMaintainer; + + @Before + public void setup() throws SQLException, IOException { + MockitoAnnotations.initMocks(this); + createDBObject(); + createMutationsWithUpserts(); + initializeRebuildScannerAttributes(); + initializeGlobalMockitoSetup(); + } + + public void createDBObject() throws SQLException { + try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) { + schema = generateUniqueName(); + table = generateUniqueName(); + index = generateUniqueName(); + dataTableFullName = SchemaUtil.getQualifiedTableName(schema, table); + indexTableFullName = SchemaUtil.getQualifiedTableName(schema, index); + + conn.createStatement().execute(String.format(CREATE_TABLE_DDL, dataTableFullName)); + conn.createStatement().execute(String.format(CREATE_INDEX_DDL, index, dataTableFullName)); + conn.commit(); + + pconn = conn.unwrap(PhoenixConnection.class); + pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), indexTableFullName)); + pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), dataTableFullName)); + } + } + + private void createMutationsWithUpserts() throws SQLException, IOException { + deleteRow(2, 3); + upsertPartialRow(2, 3, "abc"); + upsertCompleteRow(2, 3, "hik", 8); + upsertPartialRow(2, 3, 10); + upsertPartialRow(2,3,4); + deleteRow(2, 3); + upsertPartialRow(2,3, "def"); + upsertCompleteRow(2, 3, null, 20); + upsertPartialRow(2,3, "wert"); + } + + private void deleteRow(int key1, int key2) throws SQLException, IOException { + try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){ + PreparedStatement ps = + conn.prepareStatement( + String.format(DELETE_ROW_DML, dataTableFullName, FIRST_ID, SECOND_ID)); + ps.setInt(1, key1); + ps.setInt(2, key2); + ps.execute(); + convertUpsertToMutations(conn); + } + } + + private void upsertPartialRow(int key1, int key2, String val1) + throws SQLException, IOException { + + try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){ + PreparedStatement ps = + conn.prepareStatement( + String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID, + FIRST_VALUE)); + ps.setInt(1, key1); + ps.setInt(2, key2); + ps.setString(3, val1); + ps.execute(); + convertUpsertToMutations(conn); + } + } + + private void upsertPartialRow(int key1, int key2, int value1) + throws SQLException, IOException { + + try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){ + PreparedStatement + ps = + conn.prepareStatement( + String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID, + SECOND_VALUE)); + ps.setInt(1, key1); + ps.setInt(2, key2); + ps.setInt(3, value1); + ps.execute(); + convertUpsertToMutations(conn); + } + } + + private void upsertCompleteRow(int key1, int key2, String val1 + , int val2) throws SQLException, IOException { + try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) { + PreparedStatement + ps = conn.prepareStatement(String.format(COMPLETE_ROW_UPSERT, dataTableFullName)); + ps.setInt(1, key1); + ps.setInt(2, key2); + ps.setString(3, val1); + ps.setInt(4, val2); + ps.execute(); + convertUpsertToMutations(conn); + } + } + + private void convertUpsertToMutations(Connection conn) throws SQLException, IOException { + Iterator<Pair<byte[],List<KeyValue>>> + dataTableNameAndMutationKeyValuesIter = PhoenixRuntime.getUncommittedDataIterator(conn); + Pair<byte[], List<KeyValue>> elem = dataTableNameAndMutationKeyValuesIter.next(); + byte[] key = elem.getSecond().get(0).getRow(); + long mutationTS = EnvironmentEdgeManager.currentTimeMillis(); + + for (KeyValue kv : elem.getSecond()) { + Cell cell = + CellUtil.createCell(kv.getRow(), kv.getFamily(), kv.getQualifier(), + mutationTS, kv.getType(), kv.getValue()); + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null ) { + put = new Put(key); + } + put.add(cell); + } else { + if (delete == null) { + delete = new Delete(key); + } + delete.addDeleteMarker(cell); + } + } + } + + private void initializeRebuildScannerAttributes() { + when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod(); + when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod(); + when(rebuildScanner.setIndexKeyToMutationMap(Matchers.<Map>any())).thenCallRealMethod(); + rebuildScanner.setIndexTableTTL(HConstants.FOREVER); + indexMaintainer = pIndexTable.getIndexMaintainer(pDataTable, pconn); + rebuildScanner.setIndexMaintainer(indexMaintainer); + } + + private void initializeGlobalMockitoSetup() throws IOException { + //setup + when(rebuildScanner.getIndexRowKey(put)).thenCallRealMethod(); + when(rebuildScanner.prepareIndexMutations(put, delete)).thenCallRealMethod(); + when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(), + Matchers.<IndexToolVerificationResult.PhaseResult>any())).thenCallRealMethod(); + doNothing().when(rebuildScanner) + .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(), + Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(), + Matchers.<byte[]>any(), Matchers.<byte[]>any()); + doNothing().when(rebuildScanner) + .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(), + Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString()); + + //populate the local map to use to create actual mutations + indexKeyToMutationMapLocal = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMapLocal); + rebuildScanner.prepareIndexMutations(put, delete); + + //populate map to use in test code + Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR)); + rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap); + rebuildScanner.prepareIndexMutations(put, delete); + } + + private byte[] getValidRowKey() { + return indexKeyToMutationMapLocal.entrySet().iterator().next().getKey(); + } + + @Test + public void testVerifySingleIndexRow_validIndexRowCount_nonZero() throws IOException { + IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult(); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.VALID_EXACT_MATCH); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_validIndexRowCount_moreActual() throws IOException { + IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult(); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.VALID_MORE_MUTATIONS); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_allMix() throws IOException { + IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult(); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.VALID_MIX_MUTATIONS); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_allUnverified() throws IOException { + IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult(); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.VALID_NEW_UNVERIFIED_MUTATIONS); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException { + IndexToolVerificationResult.PhaseResult + expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.EXPIRED); + expireThisRow(); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_invalidIndexRowCount_cellValue() throws IOException { + IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult(); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.INVALID_CELL_VALUE); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_invalidIndexRowCount_emptyCell() throws IOException { + IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult(); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.INVALID_EMPTY_CELL); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_invalidIndexRowCount_diffColumn() throws IOException { + IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult(); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.INVALID_COLUMN); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_invalidIndexRowCount_extraCell() throws IOException { + IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult(); + for (Map.Entry<byte[], List<Mutation>> + entry : indexKeyToMutationMapLocal.entrySet()) { + initializeLocalMockitoSetup(entry, TestType.INVALID_EXTRA_CELL); + //test code + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + + assertTrue(actualPR.equals(expectedPR)); + } + } + + @Test + public void testVerifySingleIndexRow_expectedMutations_null() throws IOException { + when(indexRow.getRow()).thenReturn(Bytes.toBytes(1)); + exceptionRule.expect(DoNotRetryIOException.class); + exceptionRule.expectMessage(IndexRebuildRegionScanner.NO_EXPECTED_MUTATION); + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + } + + @Test + public void testVerifySingleIndexRow_actualMutations_null() throws IOException { + byte [] validRowKey = getValidRowKey(); + when(indexRow.getRow()).thenReturn(validRowKey); + when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(null); + exceptionRule.expect(DoNotRetryIOException.class); + exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY); + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + } + + @Test + public void testVerifySingleIndexRow_actualMutations_empty() throws IOException { + byte [] validRowKey = getValidRowKey(); + when(indexRow.getRow()).thenReturn(validRowKey); + actualMutationList = new ArrayList<>(); + when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList); + exceptionRule.expect(DoNotRetryIOException.class); + exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY); + rebuildScanner.verifySingleIndexRow(indexRow, actualPR); + } + + private IndexToolVerificationResult.PhaseResult getValidPhaseResult() { + return new IndexToolVerificationResult.PhaseResult(1,0,0,0); + } + + private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() { + return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1); + } + + private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry, + TestType testType) + throws IOException { + actualPR = new IndexToolVerificationResult.PhaseResult(); + byte[] indexKey = entry.getKey(); + when(indexRow.getRow()).thenReturn(indexKey); + actualMutationList = buildActualIndexMutationsList(testType); + when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList); + } + + private List<Mutation> buildActualIndexMutationsList(TestType testType) { + List<Mutation> actualMutations = new ArrayList<>(); + actualMutations.addAll(indexKeyToMutationMapLocal.get(indexRow.getRow())); + if(testType.equals(TestType.EXPIRED)) { + return actualMutations; + } + if(testType.toString().startsWith("VALID")) { + return getValidActualMutations(testType, actualMutations); + } + if(testType.toString().startsWith("INVALID")) { + return getInvalidActualMutations(testType, actualMutations); + } + return null; + } + + private List <Mutation> getValidActualMutations(TestType testType, + List<Mutation> actualMutations) { + List <Mutation> newActualMutations = new ArrayList<>(); + if(testType.equals(TestType.VALID_EXACT_MATCH)) { + return actualMutations; + } + if (testType.equals(TestType.VALID_MIX_MUTATIONS)) { + newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null)); + newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1))); + newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null)); + } + if (testType.equals(TestType.VALID_NEW_UNVERIFIED_MUTATIONS)) { + newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null)); + newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null)); + newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null)); + newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1))); + } + newActualMutations.addAll(actualMutations); + if(testType.equals(TestType.VALID_MORE_MUTATIONS)) { + newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null)); + newActualMutations.add(getDeleteMutation(actualMutations.get(0), null)); + newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1))); + newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1))); + } + return newActualMutations; + } + + private List <Mutation> getInvalidActualMutations(TestType testType, + List<Mutation> actualMutations) { + List <Mutation> newActualMutations = new ArrayList<>(); + newActualMutations.addAll(actualMutations); + for (Mutation m : actualMutations) { + newActualMutations.remove(m); + NavigableMap<byte[], List<Cell>> familyCellMap = m.getFamilyCellMap(); + List<Cell> cellList = familyCellMap.firstEntry().getValue(); + List<Cell> newCellList = new ArrayList<>(); + byte[] fam = CellUtil.cloneFamily(cellList.get(0)); + for (Cell c : cellList) { + infiltrateCell(c, newCellList, testType); + } + familyCellMap.put(fam, newCellList); + m.setFamilyCellMap(familyCellMap); + newActualMutations.add(m); + } + return newActualMutations; + } + + private void infiltrateCell(Cell c, List<Cell> newCellList, TestType e) { + Cell newCell; + Cell emptyCell; + switch(e) { + case INVALID_COLUMN: + newCell = + CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c), + Bytes.toBytes(UNEXPECTED_COLUMN), + EnvironmentEdgeManager.currentTimeMillis(), + KeyValue.Type.Put.getCode(), Bytes.toBytes("zxcv")); + newCellList.add(newCell); + newCellList.add(c); + break; + case INVALID_CELL_VALUE: + if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) { + newCell = getCellWithPut(c); + emptyCell = getUnverifiedEmptyCell(c); + newCellList.add(newCell); + newCellList.add(emptyCell); + } else { + newCellList.add(c); + } + break; + case INVALID_EMPTY_CELL: + if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) { + newCell = + CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c), + CellUtil.cloneQualifier(c), c.getTimestamp(), + KeyValue.Type.Delete.getCode(), VERIFIED_BYTES); + newCellList.add(newCell); + } else { + newCellList.add(c); + } + break; + case INVALID_EXTRA_CELL: + newCell = getCellWithPut(c); + emptyCell = getUnverifiedEmptyCell(c); + newCellList.add(newCell); + newCellList.add(emptyCell); + newCellList.add(c); + } + } + + private Cell getUnverifiedEmptyCell(Cell c) { + return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c), + indexMaintainer.getEmptyKeyValueQualifier(), + EnvironmentEdgeManager.currentTimeMillis(), + KeyValue.Type.Put.getCode(), UNVERIFIED_BYTES); + } + + private Cell getCellWithPut(Cell c) { + return CellUtil.createCell(CellUtil.cloneRow(c), + CellUtil.cloneFamily(c), Bytes.toBytes(INCLUDED_COLUMN), + EnvironmentEdgeManager.currentTimeMillis(), KeyValue.Type.Put.getCode(), + Bytes.toBytes("zxcv")); + } + + private void expireThisRow() { + rebuildScanner.setIndexTableTTL(INDEX_TABLE_EXPIRY_SEC); + UnitTestClock expiryClock = new UnitTestClock(5000); + EnvironmentEdgeManager.injectEdge(expiryClock); + } + + private Mutation getDeleteMutation(Mutation orig, Long ts) { + Mutation m = new Delete(orig.getRow()); + List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue(); + ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts; + Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.DeleteFamilyVersion); + Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, true); + byte[] fam = CellUtil.cloneFamily(origList.get(0)); + List<Cell> famCells = Lists.newArrayList(); + m.getFamilyCellMap().put(fam, famCells); + famCells.add(c); + famCells.add(empty); + return m; + } + + private Mutation getUnverifiedPutMutation(Mutation orig, Long ts) { + Mutation m = new Put(orig.getRow()); + if (orig.getAttributesMap() != null) { + for (Map.Entry<String,byte[]> entry : orig.getAttributesMap().entrySet()) { + m.setAttribute(entry.getKey(), entry.getValue()); + } + } + List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue(); + ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts; + Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.Put); + Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, false); + byte[] fam = CellUtil.cloneFamily(origList.get(0)); + List<Cell> famCells = Lists.newArrayList(); + m.getFamilyCellMap().put(fam, famCells); + famCells.add(c); + famCells.add(empty); + return m; + } + + private Cell getEmptyCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type, + boolean verified) { + return CellUtil.createCell(orig.getRow(), CellUtil.cloneFamily(origList.get(0)), + indexMaintainer.getEmptyKeyValueQualifier(), + ts, type.getCode(), verified ? VERIFIED_BYTES : UNVERIFIED_BYTES); + } + + private Cell getNewPutCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type) { + return CellUtil.createCell(orig.getRow(), + CellUtil.cloneFamily(origList.get(0)), Bytes.toBytes(INCLUDED_COLUMN), + ts, type.getCode(), Bytes.toBytes("asdfg")); + } +}