This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.14-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push: new 2f6e03f PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one 2f6e03f is described below commit 2f6e03f53e59bf13b0fbf9def772561a7cde7073 Author: chenglei <cheng...@apache.org> AuthorDate: Sun Dec 1 18:14:50 2019 +0800 PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one --- .../hbase/index/builder/BaseIndexBuilder.java | 4 + .../hbase/index/builder/IndexBuildManager.java | 19 ++- .../phoenix/hbase/index/builder/IndexBuilder.java | 3 +- .../hbase/index/covered/LocalTableState.java | 2 +- .../hbase/index/covered/NonTxIndexBuilder.java | 8 +- .../hbase/index/covered/data/CachedLocalTable.java | 188 +++++++++++++++++++++ .../hbase/index/covered/data/LocalHBaseState.java | 4 +- .../hbase/index/covered/data/LocalTable.java | 98 ----------- .../hbase/index/covered/LocalTableStateTest.java | 124 +++----------- .../hbase/index/covered/NonTxIndexBuilderTest.java | 32 +++- .../index/covered/TestCoveredColumnIndexCodec.java | 4 +- .../hbase/index/covered/data/TestLocalTable.java | 63 ------- 12 files changed, 272 insertions(+), 277 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index 62e5f4d..ccde6a4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -133,4 +133,8 @@ public abstract class BaseIndexBuilder implements IndexBuilder { public ReplayWrite getReplayWrite(Mutation m) { return null; } + + public RegionCoprocessorEnvironment getEnv() { + return this.env; + } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index 90d28b8..9683507 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; - import com.google.common.collect.ListMultimap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; @@ -37,6 +36,7 @@ import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable; import org.apache.phoenix.index.PhoenixIndexMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +49,7 @@ public class IndexBuildManager implements Stoppable { private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildManager.class); private final IndexBuilder delegate; private boolean stopped; + private RegionCoprocessorEnvironment regionCoprocessorEnvironment; /** * @param env environment in which <tt>this</tt> is running. Used to setup the @@ -59,6 +60,7 @@ public class IndexBuildManager implements Stoppable { // Prevent deadlock by using single thread for all reads so that we know // we can get the ReentrantRWLock. See PHOENIX-2671 for more details. this.delegate = getIndexBuilder(env); + this.regionCoprocessorEnvironment = env; } private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException { @@ -88,10 +90,14 @@ public class IndexBuildManager implements Stoppable { IndexMetaData indexMetaData) throws Throwable { // notify the delegate that we have started processing a batch this.delegate.batchStarted(miniBatchOp, indexMetaData); - + CachedLocalTable cachedLocalTable = + CachedLocalTable.build( + mutations, + (PhoenixIndexMetaData)indexMetaData, + this.regionCoprocessorEnvironment.getRegion()); // Avoid the Object overhead of the executor when it's not actually parallelizing anything. for (Mutation m : mutations) { - Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData); + Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable); for (Pair<Mutation, byte[]> update : updates) { indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow())); } @@ -105,10 +111,15 @@ public class IndexBuildManager implements Stoppable { final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp); this.delegate.batchStarted(miniBatchOp, indexMetaData); + CachedLocalTable cachedLocalTable = + CachedLocalTable.build( + mutations, + (PhoenixIndexMetaData)indexMetaData, + this.regionCoprocessorEnvironment.getRegion()); // Avoid the Object overhead of the executor when it's not actually parallelizing anything. ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size()); for (Mutation m : mutations) { - Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData); + Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable); if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) { for (Pair<Mutation, byte[]> update : updates) { update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index d3b9d6b..18e361a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; /** * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table @@ -72,7 +73,7 @@ public interface IndexBuilder extends Stoppable { * @return a Map of the mutations to make -> target index table name * @throws IOException on failure */ - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException; + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context, LocalHBaseState localHBaseState) throws IOException; /** * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index 4f65416..f3a46fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -172,7 +172,7 @@ public class LocalTableState implements TableState { // needing to lookup the prior row values. if (requiresPriorRowState) { // add the current state of the row. Uses listCells() to avoid a new array creation. - this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false); + this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations), false); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index 6945ea7..0a86010 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; -import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; import org.slf4j.Logger; @@ -40,18 +39,15 @@ import org.slf4j.LoggerFactory; public class NonTxIndexBuilder extends BaseIndexBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(NonTxIndexBuilder.class); - protected LocalHBaseState localTable; - @Override public void setup(RegionCoprocessorEnvironment env) throws IOException { super.setup(env); - this.localTable = new LocalTable(env); } @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException { + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData, LocalHBaseState localHBaseState) throws IOException { // create a state manager, so we can manage each batch - LocalTableState state = new LocalTableState(localTable, mutation); + LocalTableState state = new LocalTableState(localHBaseState, mutation); // build the index updates for each group IndexUpdateManager manager = new IndexUpdateManager(indexMetaData); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java new file mode 100644 index 0000000..7091178 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered.data; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexMetaData; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.types.PVarbinary; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; + +import java.util.HashMap; + +public class CachedLocalTable implements LocalHBaseState { + + private final HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells; + + private CachedLocalTable(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) { + this.rowKeyPtrToCells = rowKeyPtrToCells; + } + + @Override + public List<Cell> getCurrentRowState( + Mutation mutation, + Collection<? extends ColumnReference> columnReferences, + boolean ignoreNewerMutations) throws IOException { + byte[] rowKey = mutation.getRow(); + List<Cell> cells = this.rowKeyPtrToCells.get(new ImmutableBytesPtr(rowKey)); + + if(cells == null || cells.isEmpty()) { + return cells; + } + + if(!ignoreNewerMutations) { + return cells; + } + /** + * because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called + * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}), + * all cells in the mutation have the same rowKey and timestamp. + */ + long timestamp = + getMutationTimestampWhenAllCellTimestampIsSame(mutation); + List<Cell> newCells = new ArrayList<Cell>(); + for(Cell cell : cells) { + if(cell.getTimestamp() < timestamp ) { + newCells.add(cell); + } + } + return newCells; + } + + @VisibleForTesting + public static CachedLocalTable build(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) { + return new CachedLocalTable(rowKeyPtrToCells); + } + + public static CachedLocalTable build( + Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp, + PhoenixIndexMetaData indexMetaData, + Region region) throws IOException { + return preScanAllRequiredRows(dataTableMutationsWithSameRowKeyAndTimestamp, indexMetaData, region); + } + + /** + * Pre-scan all the required rows before we building the indexes for the dataTableMutationsWithSameRowKeyAndTimestamp + * parameter. + * Note: When we calling this method, for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp + * parameter, all cells in the mutation have the same rowKey and timestamp. + * @param dataTableMutationsWithSameRowKeyAndTimestamp + * @param indexMetaData + * @param region + * @throws IOException + */ + public static CachedLocalTable preScanAllRequiredRows( + Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp, + PhoenixIndexMetaData indexMetaData, + Region region) throws IOException { + List<IndexMaintainer> indexTableMaintainers = indexMetaData.getIndexMaintainers(); + Set<KeyRange> keys = new HashSet<KeyRange>(dataTableMutationsWithSameRowKeyAndTimestamp.size()); + for (Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) { + keys.add(PVarbinary.INSTANCE.getKeyRange(mutation.getRow())); + } + + Set<ColumnReference> getterColumnReferences = Sets.newHashSet(); + for (IndexMaintainer indexTableMaintainer : indexTableMaintainers) { + getterColumnReferences.addAll( + indexTableMaintainer.getAllColumns()); + } + + getterColumnReferences.add(new ColumnReference( + indexTableMaintainers.get(0).getDataEmptyKeyValueCF(), + indexTableMaintainers.get(0).getEmptyKeyValueQualifier())); + + Scan scan = IndexManagementUtil.newLocalStateScan( + Collections.singletonList(getterColumnReferences)); + ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys)); + scanRanges.initializeScan(scan); + SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); + + if(indexMetaData.getReplayWrite() != null) { + /** + * Because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called + * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}), + * for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp, all cells in the mutation + * have the same rowKey and timestamp. + */ + long timestamp = getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp); + scan.setTimeRange(0, timestamp); + scan.setFilter(new SkipScanFilter(skipScanFilter, true)); + } else { + assert scan.isRaw(); + scan.setMaxVersions(1); + scan.setFilter(skipScanFilter); + } + + HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells = + new HashMap<ImmutableBytesPtr, List<Cell>>(); + try (RegionScanner scanner = region.getScanner(scan)) { + boolean more = true; + while(more) { + List<Cell> cells = new ArrayList<Cell>(); + more = scanner.next(cells); + if (cells.isEmpty()) { + continue; + } + Cell cell = cells.get(0); + byte[] rowKey = CellUtil.cloneRow(cell); + rowKeyPtrToCells.put(new ImmutableBytesPtr(rowKey), cells); + } + } + + return new CachedLocalTable(rowKeyPtrToCells); + } + + private static long getMaxTimestamp(Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp) { + long maxTimestamp = Long.MIN_VALUE; + for(Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) { + /** + * all the cells in this mutation have the same timestamp. + */ + long timestamp = getMutationTimestampWhenAllCellTimestampIsSame(mutation); + if(timestamp > maxTimestamp) { + maxTimestamp = timestamp; + } + } + return maxTimestamp; + } + + private static long getMutationTimestampWhenAllCellTimestampIsSame(Mutation mutation) { + return mutation.getFamilyCellMap().values().iterator().next().get(0).getTimestamp(); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java index 9968627..583e7f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java @@ -19,7 +19,9 @@ package org.apache.phoenix.hbase.index.covered.data; import java.io.IOException; import java.util.Collection; +import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; @@ -43,7 +45,7 @@ public interface LocalHBaseState { * {@link Result} with no stored {@link KeyValue}s. * @throws IOException if there is an issue reading the row */ - public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations) + public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations) throws IOException; } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java deleted file mode 100644 index 402620f..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered.data; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; -import com.google.common.primitives.Longs; - -/** - * Wrapper around a lazily instantiated, local HTable. - * <p> - * Previously, we had used various row and batch caches. However, this ends up being very - * complicated when attempting manage updating and invalidating the cache with no real gain as any - * row accessed multiple times will likely be in HBase's block cache, invalidating any extra caching - * we are doing here. In the end, its simpler and about as efficient to just get the current state - * of the row from HBase and let HBase manage caching the row from disk on its own. - */ -public class LocalTable implements LocalHBaseState { - - private RegionCoprocessorEnvironment env; - - public LocalTable(RegionCoprocessorEnvironment env) { - this.env = env; - } - - @Override - public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations) - throws IOException { - byte[] row = m.getRow(); - // need to use a scan here so we can get raw state, which Get doesn't provide. - Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns)); - s.setStartRow(row); - s.setStopRow(row); - if (ignoreNewerMutations) { - // Provides a means of client indicating that newer cells should not be considered, - // enabling mutations to be replayed to partially rebuild the index when a write fails. - // When replaying mutations we want the oldest timestamp (as anything newer we be replayed) - long ts = getOldestTimestamp(m.getFamilyCellMap().values()); - s.setTimeRange(0,ts); - } - Region region = this.env.getRegion(); - try (RegionScanner scanner = region.getScanner(s)) { - List<Cell> kvs = new ArrayList<Cell>(1); - boolean more = scanner.next(kvs); - assert !more : "Got more than one result when scanning" - + " a single row in the primary table!"; - - Result r = Result.create(kvs); - return r; - } - } - - // Returns the smallest timestamp in the given cell lists. - // It is assumed that the lists have cells ordered from largest to smallest timestamp - protected long getOldestTimestamp(Collection<List<Cell>> cellLists) { - Ordering<List<Cell>> cellListOrdering = new Ordering<List<Cell>>() { - @Override - public int compare(List<Cell> left, List<Cell> right) { - // compare the last element of each list, since that is the smallest in that list - return Longs.compare(Iterables.getLast(left).getTimestamp(), - Iterables.getLast(right).getTimestamp()); - } - }; - List<Cell> minList = cellListOrdering.min(cellLists); - return Iterables.getLast(minList).getTimestamp(); - } -} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java index 56ba1d6..416c16a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java @@ -20,35 +20,32 @@ package org.apache.phoenix.hbase.index.covered; import static org.junit.Assert.assertEquals; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; -import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; -import org.apache.phoenix.hbase.index.covered.data.LocalTable; +import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.util.ScanUtil; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -/** - * - */ + public class LocalTableStateTest { private static final byte[] row = Bytes.toBytes("row"); @@ -87,23 +84,16 @@ public class LocalTableStateTest { Region region = Mockito.mock(Region.class); Mockito.when(env.getRegion()).thenReturn(region); - RegionScanner scanner = Mockito.mock(RegionScanner.class); - Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); final byte[] stored = Bytes.toBytes("stored-value"); - Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; - KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored); - kv.setSequenceId(0); - list.add(kv); - return false; - } - }); - LocalHBaseState state = new LocalTable(env); - LocalTableState table = new LocalTableState(state, m); + KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored); + kv.setSequenceId(0); + HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells = + new HashMap<ImmutableBytesPtr, List<Cell>>(); + rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)kv)); + CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells); + LocalTableState table = new LocalTableState(cachedLocalTable, m); //add the kvs from the mutation table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); @@ -121,48 +111,6 @@ public class LocalTableStateTest { super(msg); } } - - @Test(expected = ScannerCreatedException.class) - public void testScannerForMutableRows() throws Exception { - IndexMetaData indexMetaData = new IndexMetaData() { - - @Override - public ReplayWrite getReplayWrite() { - return null; - } - - @Override - public boolean requiresPriorRowState(Mutation m) { - return true; - } - - @Override - public int getClientVersion() { - return ScanUtil.UNKNOWN_CLIENT_VERSION; - } - - }; - Put m = new Put(row); - m.addColumn(fam, qual, ts, val); - // setup mocks - Configuration conf = new Configuration(false); - RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); - Mockito.when(env.getConfiguration()).thenReturn(conf); - - Region region = Mockito.mock(Region.class); - Mockito.when(env.getRegion()).thenReturn(region); - Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable")); - - LocalHBaseState state = new LocalTable(env); - LocalTableState table = new LocalTableState(state, m); - //add the kvs from the mutation - table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); - - // setup the lookup - ColumnReference col = new ColumnReference(fam, qual); - table.setCurrentTimestamp(ts); - table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); - } @Test public void testNoScannerForImmutableRows() throws Exception { @@ -193,10 +141,9 @@ public class LocalTableStateTest { Region region = Mockito.mock(Region.class); Mockito.when(env.getRegion()).thenReturn(region); - Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable")); - LocalHBaseState state = new LocalTable(env); - LocalTableState table = new LocalTableState(state, m); + CachedLocalTable cachedLocalTable = CachedLocalTable.build(null); + LocalTableState table = new LocalTableState(cachedLocalTable, m); //add the kvs from the mutation table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); @@ -223,22 +170,16 @@ public class LocalTableStateTest { Region region = Mockito.mock(Region.class); Mockito.when(env.getRegion()).thenReturn(region); - RegionScanner scanner = Mockito.mock(RegionScanner.class); - Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); final byte[] stored = Bytes.toBytes("stored-value"); final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored); storedKv.setSequenceId(2); - Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; - list.add(storedKv); - return false; - } - }); - LocalHBaseState state = new LocalTable(env); - LocalTableState table = new LocalTableState(state, m); + HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells = + new HashMap<ImmutableBytesPtr, List<Cell>>(); + rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv)); + CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells); + LocalTableState table = new LocalTableState(cachedLocalTable, m); + // add the kvs from the mutation KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0)); kv.setSequenceId(0); @@ -257,8 +198,6 @@ public class LocalTableStateTest { p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); s = p.getFirst(); assertEquals("Didn't correctly rollback the row - still found it!", null, s.next()); - Mockito.verify(env, Mockito.times(1)).getRegion(); - Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class)); } @SuppressWarnings("unchecked") @@ -269,24 +208,19 @@ public class LocalTableStateTest { Region region = Mockito.mock(Region.class); Mockito.when(env.getRegion()).thenReturn(region); - RegionScanner scanner = Mockito.mock(RegionScanner.class); - Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, Bytes.toBytes("stored-value")); storedKv.setSequenceId(2); - Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { - List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; - list.add(storedKv); - return false; - } - }); - LocalHBaseState state = new LocalTable(env); + Put pendingUpdate = new Put(row); pendingUpdate.addColumn(fam, qual, ts, val); - LocalTableState table = new LocalTableState(state, pendingUpdate); + HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells = + new HashMap<ImmutableBytesPtr, List<Cell>>(); + rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv)); + CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells); + LocalTableState table = new LocalTableState(cachedLocalTable, pendingUpdate); + // do the lookup for the given column ColumnReference col = new ColumnReference(fam, qual); @@ -302,8 +236,6 @@ public class LocalTableStateTest { p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); s = p.getFirst(); assertEquals("Lost already loaded update!", storedKv, s.next()); - Mockito.verify(env, Mockito.times(1)).getRegion(); - Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class)); } // TODO add test here for making sure multiple column references with the same column family don't diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java index f587e98..ddd7d64 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.BaseRegionScanner; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.MultiMutation; -import org.apache.phoenix.hbase.index.covered.data.LocalTable; +import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -149,6 +149,7 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class); Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true); + Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null); Mockito.when(mockIndexMetaData.getIndexMaintainers()) .thenReturn(Collections.singletonList(getTestIndexMaintainer())); @@ -212,8 +213,13 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW)); mutation.addAll(put); + CachedLocalTable cachedLocalTable = CachedLocalTable.build( + Collections.singletonList(mutation), + this.mockIndexMetaData, + this.indexBuilder.getEnv().getRegion()); + Collection<Pair<Mutation, byte[]>> indexUpdates = - indexBuilder.getIndexUpdate(mutation, mockIndexMetaData); + indexBuilder.getIndexUpdate(mutation, mockIndexMetaData, cachedLocalTable); assertEquals(2, indexUpdates.size()); assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM, new byte[0] /* qual not needed */, 2); @@ -254,8 +260,16 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { mutation.addAll(put); Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList(); - for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) { - indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData)); + Collection<? extends Mutation> mutations = + IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation)); + + CachedLocalTable cachedLocalTable = CachedLocalTable.build( + mutations, + this.mockIndexMetaData, + this.indexBuilder.getEnv().getRegion()); + + for (Mutation m : mutations) { + indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable)); } // 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index // rows for VALUE_2, VALUE_3) @@ -287,9 +301,17 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { MultiMutation mutation = getMultipleVersionMutation(200); currentRowCells = mutation.getFamilyCellMap().get(FAM); + Collection<? extends Mutation> mutations = + IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation)); + + CachedLocalTable cachedLocalTable = CachedLocalTable.build( + mutations, + this.mockIndexMetaData, + this.indexBuilder.getEnv().getRegion()); + Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList(); for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) { - indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData)); + indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable)); } assertNotEquals(0, indexUpdates.size()); } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java index d63dd6b..2fa2c6c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java @@ -134,9 +134,9 @@ public class TestCoveredColumnIndexCodec { } @Override - public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly) + public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly) throws IOException { - return r; + return r.listCells(); } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java deleted file mode 100644 index b11ac8d..0000000 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered.data; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; - -public class TestLocalTable { - private static final byte[] ROW = Bytes.toBytes("test_row"); - - @Test - public void testGetOldestTimestamp() { - LocalTable localTable = new LocalTable(null); - - List<Cell> cellList1 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 4L)); - assertEquals(4L, localTable.getOldestTimestamp(Collections.singletonList(cellList1))); - - List<Cell> cellList2 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 2L)); - List<List<Cell>> set1 = new ArrayList<>(Arrays.asList(cellList1, cellList2)); - assertEquals(2L, localTable.getOldestTimestamp(set1)); - - List<Cell> cellList3 = getCellList(new KeyValue(ROW, 1L)); - set1.add(cellList3); - assertEquals(1L, localTable.getOldestTimestamp(set1)); - - List<Cell> cellList4 = - getCellList(new KeyValue(ROW, 3L), new KeyValue(ROW, 1L), new KeyValue(ROW, 0L)); - set1.add(cellList4); - assertEquals(0L, localTable.getOldestTimestamp(set1)); - } - - private List<Cell> getCellList(KeyValue... kvs) { - List<Cell> cellList = new ArrayList<>(); - for (KeyValue kv : kvs) { - cellList.add(kv); - } - return cellList; - } -}