This is an automated email from the ASF dual-hosted git repository. chenglei pushed a commit to branch 4.x-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push: new 2db7c92 Revert "PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one" 2db7c92 is described below commit 2db7c92a07ca62938599cad03719e9271516ec5f Author: chenglei <cheng...@apache.org> AuthorDate: Sun Nov 24 16:00:07 2019 +0800 Revert "PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one" This reverts commit 9fb2d38b1b5c847745b783c95f1337b97cb13368. --- .../hbase/index/builder/BaseIndexBuilder.java | 4 - .../hbase/index/builder/IndexBuildManager.java | 20 +-- .../phoenix/hbase/index/builder/IndexBuilder.java | 3 +- .../hbase/index/covered/LocalTableState.java | 2 +- .../hbase/index/covered/NonTxIndexBuilder.java | 8 +- .../hbase/index/covered/data/CachedLocalTable.java | 72 --------- .../hbase/index/covered/data/LocalHBaseState.java | 4 +- .../hbase/index/covered/data/LocalTable.java | 98 ++++++++++++ .../hbase/index/util/IndexManagementUtil.java | 165 ++++----------------- .../hbase/index/covered/LocalTableStateTest.java | 115 ++++++++++---- .../hbase/index/covered/NonTxIndexBuilderTest.java | 32 +--- .../index/covered/TestCoveredColumnIndexCodec.java | 4 +- .../hbase/index/covered/data/TestLocalTable.java | 63 ++++++++ 13 files changed, 300 insertions(+), 290 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index e3fdc1b..f96bf9d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -133,8 +133,4 @@ public abstract class BaseIndexBuilder implements IndexBuilder { public ReplayWrite getReplayWrite(Mutation m) { return null; } - - public RegionCoprocessorEnvironment getEnv() { - return this.env; - } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index 52255d1..90d28b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; + import com.google.common.collect.ListMultimap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; @@ -36,8 +37,6 @@ import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.PhoenixIndexMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +49,6 @@ public class IndexBuildManager implements Stoppable { private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildManager.class); private final IndexBuilder delegate; private boolean stopped; - private RegionCoprocessorEnvironment regionCoprocessorEnvironment; /** * @param env environment in which <tt>this</tt> is running. Used to setup the @@ -61,7 +59,6 @@ public class IndexBuildManager implements Stoppable { // Prevent deadlock by using single thread for all reads so that we know // we can get the ReentrantRWLock. See PHOENIX-2671 for more details. this.delegate = getIndexBuilder(env); - this.regionCoprocessorEnvironment = env; } private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException { @@ -91,14 +88,10 @@ public class IndexBuildManager implements Stoppable { IndexMetaData indexMetaData) throws Throwable { // notify the delegate that we have started processing a batch this.delegate.batchStarted(miniBatchOp, indexMetaData); - CachedLocalTable cachedLocalTable = - IndexManagementUtil.preScanAllRequiredRows( - mutations, - (PhoenixIndexMetaData)indexMetaData, - this.regionCoprocessorEnvironment.getRegion()); + // Avoid the Object overhead of the executor when it's not actually parallelizing anything. for (Mutation m : mutations) { - Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable); + Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData); for (Pair<Mutation, byte[]> update : updates) { indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow())); } @@ -112,15 +105,10 @@ public class IndexBuildManager implements Stoppable { final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp); this.delegate.batchStarted(miniBatchOp, indexMetaData); - CachedLocalTable cachedLocalTable = - IndexManagementUtil.preScanAllRequiredRows( - mutations, - (PhoenixIndexMetaData)indexMetaData, - this.regionCoprocessorEnvironment.getRegion()); // Avoid the Object overhead of the executor when it's not actually parallelizing anything. ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size()); for (Mutation m : mutations) { - Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable); + Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData); if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) { for (Pair<Mutation, byte[]> update : updates) { update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index 18e361a..d3b9d6b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.IndexMetaData; -import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; /** * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table @@ -73,7 +72,7 @@ public interface IndexBuilder extends Stoppable { * @return a Map of the mutations to make -> target index table name * @throws IOException on failure */ - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context, LocalHBaseState localHBaseState) throws IOException; + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException; /** * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index be18cbe..8a069f8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -172,7 +172,7 @@ public class LocalTableState implements TableState { // needing to lookup the prior row values. if (requiresPriorRowState) { // add the current state of the row. Uses listCells() to avoid a new array creation. - this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations), false); + this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index a294ad7..19eda4d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; +import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; import org.slf4j.Logger; @@ -39,15 +40,18 @@ import org.slf4j.LoggerFactory; public class NonTxIndexBuilder extends BaseIndexBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(NonTxIndexBuilder.class); + protected LocalHBaseState localTable; + @Override public void setup(RegionCoprocessorEnvironment env) throws IOException { super.setup(env); + this.localTable = new LocalTable(env); } @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData, LocalHBaseState localHBaseState) throws IOException { + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException { // create a state manager, so we can manage each batch - LocalTableState state = new LocalTableState(localHBaseState, mutation); + LocalTableState state = new LocalTableState(localTable, mutation); // build the index updates for each group IndexUpdateManager manager = new IndexUpdateManager(indexMetaData); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java deleted file mode 100644 index f0fbfe6..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered.data; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; - -import java.util.HashMap; - -public class CachedLocalTable implements LocalHBaseState { - - private final HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells; - - public CachedLocalTable(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) { - this.rowKeyPtrToCells = rowKeyPtrToCells; - } - - @Override - public List<Cell> getCurrentRowState( - Mutation mutation, - Collection<? extends ColumnReference> columnReferences, - boolean ignoreNewerMutations) throws IOException { - byte[] rowKey = mutation.getRow(); - List<Cell> cells = this.rowKeyPtrToCells.get(new ImmutableBytesPtr(rowKey)); - - if(cells == null || cells.isEmpty()) { - return cells; - } - - if(!ignoreNewerMutations) { - return cells; - } - /** - * because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called - * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}), - * all cells in the mutation have the same rowKey and timestamp. - */ - long timestamp = - IndexManagementUtil.getMutationTimestampWhenAllCellTimestampIsSame(mutation); - List<Cell> newCells = new ArrayList<Cell>(); - for(Cell cell : cells) { - if(cell.getTimestamp() < timestamp ) { - newCells.add(cell); - } - } - return newCells; - } - -} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java index 583e7f4..9968627 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java @@ -19,9 +19,7 @@ package org.apache.phoenix.hbase.index.covered.data; import java.io.IOException; import java.util.Collection; -import java.util.List; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; @@ -45,7 +43,7 @@ public interface LocalHBaseState { * {@link Result} with no stored {@link KeyValue}s. * @throws IOException if there is an issue reading the row */ - public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations) + public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations) throws IOException; } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java new file mode 100644 index 0000000..402620f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered.data; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Longs; + +/** + * Wrapper around a lazily instantiated, local HTable. + * <p> + * Previously, we had used various row and batch caches. However, this ends up being very + * complicated when attempting manage updating and invalidating the cache with no real gain as any + * row accessed multiple times will likely be in HBase's block cache, invalidating any extra caching + * we are doing here. In the end, its simpler and about as efficient to just get the current state + * of the row from HBase and let HBase manage caching the row from disk on its own. + */ +public class LocalTable implements LocalHBaseState { + + private RegionCoprocessorEnvironment env; + + public LocalTable(RegionCoprocessorEnvironment env) { + this.env = env; + } + + @Override + public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations) + throws IOException { + byte[] row = m.getRow(); + // need to use a scan here so we can get raw state, which Get doesn't provide. + Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns)); + s.setStartRow(row); + s.setStopRow(row); + if (ignoreNewerMutations) { + // Provides a means of client indicating that newer cells should not be considered, + // enabling mutations to be replayed to partially rebuild the index when a write fails. + // When replaying mutations we want the oldest timestamp (as anything newer we be replayed) + long ts = getOldestTimestamp(m.getFamilyCellMap().values()); + s.setTimeRange(0,ts); + } + Region region = this.env.getRegion(); + try (RegionScanner scanner = region.getScanner(s)) { + List<Cell> kvs = new ArrayList<Cell>(1); + boolean more = scanner.next(kvs); + assert !more : "Got more than one result when scanning" + + " a single row in the primary table!"; + + Result r = Result.create(kvs); + return r; + } + } + + // Returns the smallest timestamp in the given cell lists. + // It is assumed that the lists have cells ordered from largest to smallest timestamp + protected long getOldestTimestamp(Collection<List<Cell>> cellLists) { + Ordering<List<Cell>> cellListOrdering = new Ordering<List<Cell>>() { + @Override + public int compare(List<Cell> left, List<Cell> right) { + // compare the last element of each list, since that is the smallest in that list + return Longs.compare(Iterables.getLast(left).getTimestamp(), + Iterables.getLast(right).getTimestamp()); + } + }; + List<Cell> minList = cellListOrdering.min(cellLists); + return Iterables.getLast(minList).getTimestamp(); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java index 081a185..84ccdb1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java @@ -23,10 +23,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -38,29 +36,17 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException; import org.apache.phoenix.hbase.index.covered.Batch; -import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable; import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; -import org.apache.phoenix.index.IndexMaintainer; -import org.apache.phoenix.index.PhoenixIndexMetaData; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.types.PVarbinary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.phoenix.hbase.index.IndexRegionObserver; -import org.apache.phoenix.hbase.index.Indexer; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.common.primitives.Longs; /** @@ -265,124 +251,35 @@ public class IndexManagementUtil { } public static Collection<? extends Mutation> flattenMutationsByTimestamp(Collection<? extends Mutation> mutations) { - List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10); - for (Mutation m : mutations) { - byte[] row = m.getRow(); - Collection<Batch> batches = createTimestampBatchesFromMutation(m); - for (Batch batch : batches) { - Mutation mWithSameTS; - Cell firstCell = batch.getKvs().get(0); - if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) { - mWithSameTS = new Put(row); - } else { - mWithSameTS = new Delete(row); - } - if (m.getAttributesMap() != null) { - for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) { - mWithSameTS.setAttribute(entry.getKey(), entry.getValue()); - } - } - for (Cell cell : batch.getKvs()) { - byte[] fam = CellUtil.cloneFamily(cell); - List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam); - if (famCells == null) { - famCells = Lists.newArrayList(); - mWithSameTS.getFamilyCellMap().put(fam, famCells); - } - famCells.add(cell); - } - flattenedMutations.add(mWithSameTS); - } - } - return flattenedMutations; - } - - /** - * Pre-scan all the required rows before we building the indexes for the dataTableMutationsWithSameRowKeyAndTimestamp - * parameter. - * Note: When we calling this method, for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp - * parameter, all cells in the mutation have the same rowKey and timestamp. - * @param dataTableMutationsWithSameRowKeyAndTimestamp - * @param indexMetaData - * @param region - * @throws IOException - */ - public static CachedLocalTable preScanAllRequiredRows( - Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp, - final PhoenixIndexMetaData indexMetaData, - Region region) throws IOException { - List<IndexMaintainer> indexTableMaintainers = indexMetaData.getIndexMaintainers(); - Set<KeyRange> keys = new HashSet<KeyRange>(dataTableMutationsWithSameRowKeyAndTimestamp.size()); - for (Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) { - keys.add(PVarbinary.INSTANCE.getKeyRange(mutation.getRow())); - } - - Set<ColumnReference> getterColumnReferences = Sets.newHashSet(); - for (IndexMaintainer indexTableMaintainer : indexTableMaintainers) { - getterColumnReferences.addAll( - indexTableMaintainer.getAllColumns()); - } - - getterColumnReferences.add(new ColumnReference( - indexTableMaintainers.get(0).getDataEmptyKeyValueCF(), - indexTableMaintainers.get(0).getEmptyKeyValueQualifier())); - - Scan scan = IndexManagementUtil.newLocalStateScan( - Collections.singletonList(getterColumnReferences)); - ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys)); - scanRanges.initializeScan(scan); - SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); - - if(indexMetaData.getReplayWrite() != null) { - /** - * Because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called - * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}), - * for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp, all cells in the mutation - * have the same rowKey and timestamp. - */ - long timestamp = getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp); - scan.setTimeRange(0, timestamp); - scan.setFilter(new SkipScanFilter(skipScanFilter, true)); - } else { - assert scan.isRaw(); - scan.setMaxVersions(1); - scan.setFilter(skipScanFilter); - } - - HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells = - new HashMap<ImmutableBytesPtr, List<Cell>>(); - try (RegionScanner scanner = region.getScanner(scan)) { - boolean more = true; - while(more) { - List<Cell> cells = new ArrayList<Cell>(); - more = scanner.next(cells); - if (cells.isEmpty()) { - continue; - } - Cell cell = cells.get(0); - byte[] rowKey = CellUtil.cloneRow(cell); - rowKeyPtrToCells.put(new ImmutableBytesPtr(rowKey), cells); - } - } - - return new CachedLocalTable(rowKeyPtrToCells); - } - - private static long getMaxTimestamp(Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp) { - long maxTimestamp = Long.MIN_VALUE; - for(Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) { - /** - * all the cells in this mutation have the same timestamp. - */ - long timestamp = getMutationTimestampWhenAllCellTimestampIsSame(mutation); - if(timestamp > maxTimestamp) { - maxTimestamp = timestamp; - } - } - return maxTimestamp; - } - - public static long getMutationTimestampWhenAllCellTimestampIsSame(Mutation mutation) { - return mutation.getFamilyCellMap().values().iterator().next().get(0).getTimestamp(); - } + List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10); + for (Mutation m : mutations) { + byte[] row = m.getRow(); + Collection<Batch> batches = createTimestampBatchesFromMutation(m); + for (Batch batch : batches) { + Mutation mWithSameTS; + Cell firstCell = batch.getKvs().get(0); + if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) { + mWithSameTS = new Put(row); + } else { + mWithSameTS = new Delete(row); + } + if (m.getAttributesMap() != null) { + for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) { + mWithSameTS.setAttribute(entry.getKey(), entry.getValue()); + } + } + for (Cell cell : batch.getKvs()) { + byte[] fam = CellUtil.cloneFamily(cell); + List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam); + if (famCells == null) { + famCells = Lists.newArrayList(); + mWithSameTS.getFamilyCellMap().put(fam, famCells); + } + famCells.add(cell); + } + flattenedMutations.add(mWithSameTS); + } + } + return flattenedMutations; + } } \ No newline at end of file diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java index 272bb26..56ba1d6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java @@ -20,12 +20,9 @@ package org.apache.phoenix.hbase.index.covered; import static org.junit.Assert.assertEquals; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; @@ -38,12 +35,11 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; -import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; +import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.util.ScanUtil; import org.junit.Test; import org.mockito.Mockito; @@ -91,16 +87,23 @@ public class LocalTableStateTest { Region region = Mockito.mock(Region.class); Mockito.when(env.getRegion()).thenReturn(region); + RegionScanner scanner = Mockito.mock(RegionScanner.class); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); final byte[] stored = Bytes.toBytes("stored-value"); + Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; + KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored); + kv.setSequenceId(0); + list.add(kv); + return false; + } + }); - KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored); - kv.setSequenceId(0); - HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells = - new HashMap<ImmutableBytesPtr, List<Cell>>(); - rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)kv)); - CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells); - LocalTableState table = new LocalTableState(cachedLocalTable, m); + LocalHBaseState state = new LocalTable(env); + LocalTableState table = new LocalTableState(state, m); //add the kvs from the mutation table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); @@ -118,6 +121,48 @@ public class LocalTableStateTest { super(msg); } } + + @Test(expected = ScannerCreatedException.class) + public void testScannerForMutableRows() throws Exception { + IndexMetaData indexMetaData = new IndexMetaData() { + + @Override + public ReplayWrite getReplayWrite() { + return null; + } + + @Override + public boolean requiresPriorRowState(Mutation m) { + return true; + } + + @Override + public int getClientVersion() { + return ScanUtil.UNKNOWN_CLIENT_VERSION; + } + + }; + Put m = new Put(row); + m.addColumn(fam, qual, ts, val); + // setup mocks + Configuration conf = new Configuration(false); + RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class); + Mockito.when(env.getConfiguration()).thenReturn(conf); + + Region region = Mockito.mock(Region.class); + Mockito.when(env.getRegion()).thenReturn(region); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable")); + + LocalHBaseState state = new LocalTable(env); + LocalTableState table = new LocalTableState(state, m); + //add the kvs from the mutation + table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); + + // setup the lookup + ColumnReference col = new ColumnReference(fam, qual); + table.setCurrentTimestamp(ts); + table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + } @Test public void testNoScannerForImmutableRows() throws Exception { @@ -148,9 +193,10 @@ public class LocalTableStateTest { Region region = Mockito.mock(Region.class); Mockito.when(env.getRegion()).thenReturn(region); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable")); - CachedLocalTable cachedLocalTable = new CachedLocalTable(null); - LocalTableState table = new LocalTableState(cachedLocalTable, m); + LocalHBaseState state = new LocalTable(env); + LocalTableState table = new LocalTableState(state, m); //add the kvs from the mutation table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); @@ -177,16 +223,22 @@ public class LocalTableStateTest { Region region = Mockito.mock(Region.class); Mockito.when(env.getRegion()).thenReturn(region); + RegionScanner scanner = Mockito.mock(RegionScanner.class); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); final byte[] stored = Bytes.toBytes("stored-value"); final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored); storedKv.setSequenceId(2); + Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; - HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells = - new HashMap<ImmutableBytesPtr, List<Cell>>(); - rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv)); - CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells); - LocalTableState table = new LocalTableState(cachedLocalTable, m); - + list.add(storedKv); + return false; + } + }); + LocalHBaseState state = new LocalTable(env); + LocalTableState table = new LocalTableState(state, m); // add the kvs from the mutation KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0)); kv.setSequenceId(0); @@ -205,6 +257,8 @@ public class LocalTableStateTest { p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); s = p.getFirst(); assertEquals("Didn't correctly rollback the row - still found it!", null, s.next()); + Mockito.verify(env, Mockito.times(1)).getRegion(); + Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class)); } @SuppressWarnings("unchecked") @@ -215,19 +269,24 @@ public class LocalTableStateTest { Region region = Mockito.mock(Region.class); Mockito.when(env.getRegion()).thenReturn(region); + RegionScanner scanner = Mockito.mock(RegionScanner.class); + Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner); final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, Bytes.toBytes("stored-value")); storedKv.setSequenceId(2); + Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0]; - + list.add(storedKv); + return false; + } + }); + LocalHBaseState state = new LocalTable(env); Put pendingUpdate = new Put(row); pendingUpdate.addColumn(fam, qual, ts, val); - HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells = - new HashMap<ImmutableBytesPtr, List<Cell>>(); - rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv)); - CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells); - LocalTableState table = new LocalTableState(cachedLocalTable, pendingUpdate); - + LocalTableState table = new LocalTableState(state, pendingUpdate); // do the lookup for the given column ColumnReference col = new ColumnReference(fam, qual); @@ -243,6 +302,8 @@ public class LocalTableStateTest { p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); s = p.getFirst(); assertEquals("Lost already loaded update!", storedKv, s.next()); + Mockito.verify(env, Mockito.times(1)).getRegion(); + Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class)); } // TODO add test here for making sure multiple column references with the same column family don't diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java index 24a3a55..f587e98 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.BaseRegionScanner; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite; import org.apache.phoenix.hbase.index.MultiMutation; -import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable; +import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -149,7 +149,6 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class); Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true); - Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null); Mockito.when(mockIndexMetaData.getIndexMaintainers()) .thenReturn(Collections.singletonList(getTestIndexMaintainer())); @@ -213,13 +212,8 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW)); mutation.addAll(put); - CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows( - Collections.singletonList(mutation), - this.mockIndexMetaData, - this.indexBuilder.getEnv().getRegion()); - Collection<Pair<Mutation, byte[]>> indexUpdates = - indexBuilder.getIndexUpdate(mutation, mockIndexMetaData, cachedLocalTable); + indexBuilder.getIndexUpdate(mutation, mockIndexMetaData); assertEquals(2, indexUpdates.size()); assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM, new byte[0] /* qual not needed */, 2); @@ -260,16 +254,8 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { mutation.addAll(put); Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList(); - Collection<? extends Mutation> mutations = - IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation)); - - CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows( - mutations, - this.mockIndexMetaData, - this.indexBuilder.getEnv().getRegion()); - - for (Mutation m : mutations) { - indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable)); + for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) { + indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData)); } // 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index // rows for VALUE_2, VALUE_3) @@ -301,17 +287,9 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { MultiMutation mutation = getMultipleVersionMutation(200); currentRowCells = mutation.getFamilyCellMap().get(FAM); - Collection<? extends Mutation> mutations = - IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation)); - - CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows( - mutations, - this.mockIndexMetaData, - this.indexBuilder.getEnv().getRegion()); - Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList(); for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) { - indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable)); + indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData)); } assertNotEquals(0, indexUpdates.size()); } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java index ae63f83..e0a8ebe 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java @@ -134,9 +134,9 @@ public class TestCoveredColumnIndexCodec { } @Override - public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly) + public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly) throws IOException { - return r.listCells(); + return r; } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java new file mode 100644 index 0000000..b11ac8d --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered.data; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +public class TestLocalTable { + private static final byte[] ROW = Bytes.toBytes("test_row"); + + @Test + public void testGetOldestTimestamp() { + LocalTable localTable = new LocalTable(null); + + List<Cell> cellList1 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 4L)); + assertEquals(4L, localTable.getOldestTimestamp(Collections.singletonList(cellList1))); + + List<Cell> cellList2 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 2L)); + List<List<Cell>> set1 = new ArrayList<>(Arrays.asList(cellList1, cellList2)); + assertEquals(2L, localTable.getOldestTimestamp(set1)); + + List<Cell> cellList3 = getCellList(new KeyValue(ROW, 1L)); + set1.add(cellList3); + assertEquals(1L, localTable.getOldestTimestamp(set1)); + + List<Cell> cellList4 = + getCellList(new KeyValue(ROW, 3L), new KeyValue(ROW, 1L), new KeyValue(ROW, 0L)); + set1.add(cellList4); + assertEquals(0L, localTable.getOldestTimestamp(set1)); + } + + private List<Cell> getCellList(KeyValue... kvs) { + List<Cell> cellList = new ArrayList<>(); + for (KeyValue kv : kvs) { + cellList.add(kv); + } + return cellList; + } +}